Compare commits
11 Commits
master
...
dynamic-br
Author | SHA1 | Date |
---|---|---|
|
2760985f1e | 4 years ago |
|
90566d3f9e | 4 years ago |
|
63d400dc50 | 4 years ago |
|
8491fd4a0c | 4 years ago |
|
cd3b588601 | 4 years ago |
|
2b866904aa | 5 years ago |
|
1f6fa8d771 | 5 years ago |
|
efa0ff6908 | 5 years ago |
|
360d9be7d8 | 5 years ago |
|
7f9b017552 | 5 years ago |
|
b14eafd5ec | 5 years ago |
@ -0,0 +1,584 @@
|
||||
/*
|
||||
Copyright (c) 2015-2020 Tifaifai Maupiti <tifaifai.maupiti@gmail.com>
|
||||
|
||||
All rights reserved. This program and the accompanying materials
|
||||
are made available under the terms of the Eclipse Public License v1.0
|
||||
and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
|
||||
The Eclipse Public License is available at
|
||||
http://www.eclipse.org/legal/epl-v10.html
|
||||
and the Eclipse Distribution License is available at
|
||||
http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
|
||||
Contributors:
|
||||
Tifaifai Maupiti - initial implementation and documentation.
|
||||
*/
|
||||
#include "config.h"
|
||||
|
||||
#ifdef WITH_CJSON
|
||||
# include <cjson/cJSON.h>
|
||||
#endif
|
||||
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#ifndef WIN32
|
||||
#include <signal.h>
|
||||
#include <sys/time.h>
|
||||
#include <time.h>
|
||||
#else
|
||||
#include <process.h>
|
||||
#include <winsock2.h>
|
||||
#define snprintf sprintf_s
|
||||
#endif
|
||||
|
||||
#include <mqtt_protocol.h>
|
||||
#include <mosquitto.h>
|
||||
#include "client_shared.h"
|
||||
#include "pub_shared.h"
|
||||
|
||||
/* Global variables for use in callbacks. See sub_client.c for an example of
|
||||
* using a struct to hold variables for use in callbacks. */
|
||||
struct mosquitto *mosq = NULL;
|
||||
bool process_messages = true;
|
||||
|
||||
static int last_mid_sent = -1;
|
||||
static bool disconnect_sent = false;
|
||||
static int publish_count = 0;
|
||||
static volatile int status = STATUS_CONNECTING;
|
||||
|
||||
static char *topic = NULL;
|
||||
static char *name = NULL;
|
||||
static char *pattern = NULL;
|
||||
static char *direction = NULL;
|
||||
static char *local_prefix = NULL;
|
||||
static char *remote_prefix = NULL;
|
||||
static char *remote_add = NULL;
|
||||
static int remote_port;
|
||||
static int qos = 0;
|
||||
static char *msg = NULL;
|
||||
static char *msg_json = NULL;
|
||||
|
||||
void * ptrMosq = NULL;
|
||||
int process_run = 1;
|
||||
int nb_line = 0;
|
||||
|
||||
struct bridge{
|
||||
char *name;
|
||||
};
|
||||
|
||||
struct bridge_list{
|
||||
int bridge_list_count;
|
||||
struct bridge* bridge;
|
||||
};
|
||||
|
||||
struct bridge_list *bridges = NULL;
|
||||
|
||||
int show_bridges(struct bridge_list* bridges)
|
||||
{
|
||||
int i;
|
||||
char * bridge_name;
|
||||
uint32_t bridge_name_len;
|
||||
|
||||
for(i=0; i<nb_line ;i++)
|
||||
{
|
||||
printf("\33[2K");
|
||||
printf("\033[A");
|
||||
}
|
||||
|
||||
nb_line = 0;
|
||||
|
||||
for(i=0;i<bridges->bridge_list_count;i++)
|
||||
{
|
||||
bridge_name_len = (uint32_t)strlen(bridges->bridge[i].name) - (uint32_t)strlen("/state") - (uint32_t)strlen("$SYS/broker/connection/");
|
||||
bridge_name = malloc(bridge_name_len*sizeof(char));
|
||||
memset(bridge_name, 0, bridge_name_len);
|
||||
memcpy(bridge_name, bridges->bridge[i].name + (uint32_t)strlen("$SYS/broker/connection/")*sizeof(char) , bridge_name_len);
|
||||
printf("%s\n",bridge_name);
|
||||
fflush(stdout);
|
||||
free(bridge_name);
|
||||
nb_line++;
|
||||
}
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
int show_bridges_json(struct bridge_list* bridges){
|
||||
int rc;
|
||||
|
||||
#ifndef WITH_CJSON
|
||||
rc = show_bridges(bridges);
|
||||
return rc;
|
||||
#endif
|
||||
|
||||
int i;
|
||||
char * bridge_name;
|
||||
uint32_t bridge_name_len;
|
||||
char *string_json = NULL;
|
||||
cJSON *bridges_json = NULL;
|
||||
cJSON *bridge_json = NULL;
|
||||
cJSON *connection_json = NULL;
|
||||
|
||||
if(cfg.bridge_conf_json == CONF_JSON){
|
||||
cJSON *s_bridges_json = cJSON_CreateObject();
|
||||
if(s_bridges_json == NULL) {
|
||||
cJSON_Delete(s_bridges_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
bridges_json = cJSON_CreateArray();
|
||||
if(bridges_json == NULL) {
|
||||
cJSON_Delete(s_bridges_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
cJSON_AddItemToObject(s_bridges_json, "bridges", bridges_json);
|
||||
for(i=0;i<bridges->bridge_list_count;i++)
|
||||
{
|
||||
bridge_json = cJSON_CreateObject();
|
||||
if(bridge_json == NULL){
|
||||
cJSON_Delete(s_bridges_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
cJSON_AddItemToArray(bridges_json, bridge_json);
|
||||
bridge_name_len = (uint32_t)strlen(bridges->bridge[i].name) - (uint32_t)strlen("/state") - (uint32_t)strlen("$SYS/broker/connection/");
|
||||
bridge_name = malloc(bridge_name_len*sizeof(char));
|
||||
memset(bridge_name, 0, bridge_name_len);
|
||||
memcpy(bridge_name, bridges->bridge[i].name + (uint32_t)strlen("$SYS/broker/connection/")*sizeof(char) , bridge_name_len);
|
||||
connection_json = cJSON_CreateString(bridge_name);
|
||||
if(connection_json == NULL){
|
||||
cJSON_Delete(s_bridges_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
cJSON_AddItemToObject(bridge_json, "connection", connection_json);
|
||||
free(bridge_name);
|
||||
}
|
||||
string_json = cJSON_PrintUnformatted(s_bridges_json);
|
||||
if(string_json == NULL){
|
||||
printf("Error, Failed to print show_bridges.\n");
|
||||
}
|
||||
printf("%s\n",string_json);
|
||||
cJSON_Delete(s_bridges_json);
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}else{
|
||||
rc = show_bridges(bridges);
|
||||
return rc;
|
||||
}
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message, const mosquitto_property *properties)
|
||||
{
|
||||
UNUSED(properties);
|
||||
struct bridge_list *bridges;
|
||||
bridges = (struct bridge_list*) obj;
|
||||
int valid_erase = 0;
|
||||
int i;
|
||||
|
||||
if(!strcmp(message->payload,"1")){
|
||||
bridges->bridge_list_count++;
|
||||
bridges->bridge = realloc(bridges->bridge, (size_t)bridges->bridge_list_count*sizeof(struct bridge));
|
||||
if(!bridges->bridge){
|
||||
printf("Error: Out of memory. 1\n");
|
||||
exit(-1);
|
||||
}
|
||||
bridges->bridge[bridges->bridge_list_count-1].name = malloc(strlen(message->topic)*sizeof(char));
|
||||
if(!bridges->bridge[bridges->bridge_list_count-1].name){
|
||||
printf("Error: Out of memory. 2\n");
|
||||
exit(-1);
|
||||
}
|
||||
bridges->bridge[bridges->bridge_list_count-1].name = strdup(message->topic);
|
||||
show_bridges_json(bridges);
|
||||
}else{
|
||||
if(bridges->bridge_list_count>0){
|
||||
for (i = 0; i < bridges->bridge_list_count; i++) {
|
||||
if(!strcmp(bridges->bridge[i].name,message->topic)){
|
||||
valid_erase = i;
|
||||
bridges->bridge_list_count--;
|
||||
}
|
||||
}
|
||||
|
||||
if(valid_erase){
|
||||
for (i = valid_erase; i < bridges->bridge_list_count; i++) {
|
||||
bridges->bridge[i] = bridges->bridge[i+1];
|
||||
}
|
||||
bridges->bridge = realloc(bridges->bridge, (size_t)bridges->bridge_list_count*sizeof(struct bridge));
|
||||
if(!bridges->bridge){
|
||||
printf("Error: Out of memory. 3\n");
|
||||
exit(-1);
|
||||
}
|
||||
show_bridges_json(bridges);
|
||||
}
|
||||
}else{
|
||||
bridges->bridge_list_count = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc, const mosquitto_property *properties)
|
||||
{
|
||||
UNUSED(mosq);
|
||||
UNUSED(obj);
|
||||
UNUSED(rc);
|
||||
UNUSED(properties);
|
||||
|
||||
if(rc == 0){
|
||||
status = STATUS_DISCONNECTED;
|
||||
}
|
||||
}
|
||||
|
||||
int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, void *payload, int qos, bool retain)
|
||||
{
|
||||
return mosquitto_publish_v5(mosq, mid, topic, payloadlen, payload, qos, retain, cfg.publish_props);
|
||||
}
|
||||
|
||||
void my_connect_callback_pub(struct mosquitto *mosq, void *obj, int result, int flags, const mosquitto_property *properties)
|
||||
{
|
||||
UNUSED(obj);
|
||||
UNUSED(flags);
|
||||
UNUSED(properties);
|
||||
int rc = MOSQ_ERR_SUCCESS;
|
||||
|
||||
if(!result){
|
||||
rc = my_publish(mosq, &mid_sent, cfg.topic, cfg.msglen, cfg.message, cfg.qos, cfg.retain);
|
||||
if(rc){
|
||||
switch(rc){
|
||||
case MOSQ_ERR_INVAL:
|
||||
err_printf(&cfg, "Error: Invalid input. Does your topic contain '+' or '#'?\n");
|
||||
break;
|
||||
case MOSQ_ERR_NOMEM:
|
||||
err_printf(&cfg, "Error: Out of memory when trying to publish message.\n");
|
||||
break;
|
||||
case MOSQ_ERR_NO_CONN:
|
||||
err_printf(&cfg, "Error: Client not connected when trying to publish.\n");
|
||||
break;
|
||||
case MOSQ_ERR_PROTOCOL:
|
||||
err_printf(&cfg, "Error: Protocol error when communicating with broker.\n");
|
||||
break;
|
||||
case MOSQ_ERR_PAYLOAD_SIZE:
|
||||
err_printf(&cfg, "Error: Message payload is too large.\n");
|
||||
break;
|
||||
case MOSQ_ERR_QOS_NOT_SUPPORTED:
|
||||
err_printf(&cfg, "Error: Message QoS not supported on broker, try a lower QoS.\n");
|
||||
break;
|
||||
}
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
}
|
||||
}else{
|
||||
if(result){
|
||||
if(cfg.protocol_version == MQTT_PROTOCOL_V5){
|
||||
if(result == MQTT_RC_UNSUPPORTED_PROTOCOL_VERSION){
|
||||
err_printf(&cfg, "Connection error: %s. Try connecting to an MQTT v5 broker, or use MQTT v3.x mode.\n", mosquitto_reason_string(result));
|
||||
}else{
|
||||
err_printf(&cfg, "Connection error: %s\n", mosquitto_reason_string(result));
|
||||
}
|
||||
}else{
|
||||
err_printf(&cfg, "Connection error: %s\n", mosquitto_connack_string(result));
|
||||
}
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties)
|
||||
{
|
||||
UNUSED(obj);
|
||||
UNUSED(properties);
|
||||
|
||||
last_mid_sent = mid;
|
||||
if(reason_code > 127){
|
||||
err_printf(&cfg, "Warning: Publish %d failed: %s.\n", mid, mosquitto_reason_string(reason_code));
|
||||
}
|
||||
publish_count++;
|
||||
if(disconnect_sent == false){
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
disconnect_sent = true;
|
||||
}
|
||||
}
|
||||
|
||||
void my_connect_callback_sub(struct mosquitto *mosq, void *obj, int result, int flags, const mosquitto_property *properties)
|
||||
{
|
||||
UNUSED(obj);
|
||||
UNUSED(flags);
|
||||
UNUSED(properties);
|
||||
|
||||
if(!result){
|
||||
mosquitto_subscribe(mosq, NULL, "$SYS/broker/connection/#", 0);
|
||||
}
|
||||
}
|
||||
|
||||
void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
|
||||
{
|
||||
UNUSED(obj);
|
||||
int i;
|
||||
|
||||
if(cfg.debug){
|
||||
if(!cfg.quiet) printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
|
||||
for(i=1; i<qos_count; i++){
|
||||
if(!cfg.quiet) printf(", %d", granted_qos[i]);
|
||||
}
|
||||
if(!cfg.quiet) printf("\n");
|
||||
}
|
||||
|
||||
if(cfg.exit_after_sub){
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
}
|
||||
}
|
||||
|
||||
int bridge_shared_init(void)
|
||||
{
|
||||
bridges = malloc(sizeof(struct bridge_list));
|
||||
if(!bridges){
|
||||
printf("Error: Out of memory.\n");
|
||||
return 1;
|
||||
}
|
||||
bridges->bridge_list_count = 0;
|
||||
ptrMosq = bridges;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int pub_loop(struct mosquitto *mosq)
|
||||
{
|
||||
int rc;
|
||||
int loop_delay = 1000;
|
||||
|
||||
do{
|
||||
rc = mosquitto_loop(mosq, loop_delay, 1);
|
||||
}while(rc == MOSQ_ERR_SUCCESS);
|
||||
|
||||
if(status == STATUS_DISCONNECTED){
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}else{
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
void bridge_shared_cleanup(void)
|
||||
{
|
||||
free(bridges);
|
||||
}
|
||||
|
||||
void print_usage(void)
|
||||
{
|
||||
int major, minor, revision;
|
||||
|
||||
mosquitto_lib_version(&major, &minor, &revision);
|
||||
printf("mosquitto_bridger is a simple mqtt client that will publish a message on a single topic on one broker to manage bridge dynamiclly with another and exit.\n");
|
||||
printf("mosquitto_bridge version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision);
|
||||
printf("Usage: mosquitto_bridge [-h local host] [-p local port] [-q qos] -a remote host -P remote port -t bridge pattern -D bridge direction -l local prefix -r remote prefix\n");
|
||||
printf("\n");
|
||||
printf("mosquitto_bridge --help\n\n");
|
||||
printf(" -j | --json : JSON message configuration\n");
|
||||
printf(" -a | --address : address configuration\n");
|
||||
printf(" -c | --connection : connection configuration\n");
|
||||
printf(" -d | --del : delete bridge dynamic\n");
|
||||
printf(" -D | --direction : direction configuration : in, out or both\n");
|
||||
printf(" -h | --host : local Mosquitto host bridge dynamic compatible broker to make new/del bridge.\n");
|
||||
printf(" -k | --know : know actif bridges on local broker\n");
|
||||
printf(" -l | --local : local prefix configuration\n");
|
||||
printf(" -n | --new : new bridge dynamic\n");
|
||||
printf(" -p | --port : network port to connect to local. Defaults to 1883.\n");
|
||||
printf(" -P | --pw : provide a password (requires MQTT 3.1 broker)\n");
|
||||
printf(" -q | --qos : quality of service level to use for all messages. Defaults to 0.\n");
|
||||
printf(" -r | --remote : remote prefix configuration\n");
|
||||
printf(" -R | --remotePort : network port to connect to remote. no defaults\n");
|
||||
printf(" -u | --username : provide a username (requires MQTT 3.1 broker)\n");
|
||||
printf(" --help : display this message.\n");
|
||||
}
|
||||
|
||||
#ifndef WIN32
|
||||
void my_signal_handler(int signum)
|
||||
{
|
||||
if(signum == SIGALRM || signum == SIGTERM || signum == SIGINT){
|
||||
process_run = 0;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
int rc;
|
||||
#ifndef WIN32
|
||||
struct sigaction sigact;
|
||||
#endif
|
||||
int msg_len, msg_json_len;
|
||||
|
||||
mosquitto_lib_init();
|
||||
|
||||
rc = client_config_load_bridge(&cfg, CLIENT_PUB, argc, argv);
|
||||
if(rc){
|
||||
if(rc == 2){
|
||||
/* --help */
|
||||
print_usage();
|
||||
}else{
|
||||
fprintf(stderr, "\nUse 'mosquitto_bridge(2) --help' to see usage.\n");
|
||||
}
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if(cfg.know_bridge_connection){
|
||||
if(bridge_shared_init()) goto cleanup;
|
||||
}
|
||||
if(cfg.bridgeType == BRIDGE_NEW){
|
||||
topic = strdup("$BRIDGE/new");
|
||||
name = cfg.bridge.name;
|
||||
pattern = cfg.bridge.topics[0].topic;
|
||||
qos = cfg.bridge.topics[0].qos;
|
||||
if(cfg.bridge.topics[0].direction == bd_out){
|
||||
direction = strdup("out");
|
||||
}else if(cfg.bridge.topics[0].direction == bd_in){
|
||||
direction = strdup("in");
|
||||
}else if(cfg.bridge.topics[0].direction == bd_both){
|
||||
direction = strdup("both");
|
||||
}
|
||||
local_prefix = cfg.bridge.topics[0].local_prefix;
|
||||
remote_prefix = cfg.bridge.topics[0].remote_prefix;
|
||||
remote_add = cfg.bridge.addresses[0].address;
|
||||
remote_port = cfg.bridge.addresses[0].port;
|
||||
if(cfg.bridge_conf_json == CONF_JSON){
|
||||
msg_json_len = snprintf(NULL,0,"{\"bridges\":[{\"connection\":\"%s\",\"addresses\":[{\"address\":\"%s\",\"port\":%d}],\"topic\":\"%s\",\"direction\":\"%s\",\"qos\":%d,\"local_prefix\":\"%s\",\"remote_prefix\":\"%s\"}]}",name
|
||||
,remote_add,remote_port,pattern,direction,qos,local_prefix,remote_prefix);
|
||||
msg_json_len++;
|
||||
msg_json = (char*) malloc((size_t)msg_json_len);
|
||||
snprintf(msg_json,(size_t)msg_json_len,"{\"bridges\":[{\"connection\":\"%s\",\"addresses\":[{\"address\":\"%s\",\"port\":%d}],\"topic\":\"%s\",\"direction\":\"%s\",\"qos\":%d,\"local_prefix\":\"%s\",\"remote_prefix\":\"%s\"}]}",name
|
||||
,remote_add,remote_port,pattern,direction,qos,local_prefix,remote_prefix);
|
||||
cfg.message = strdup(msg_json);
|
||||
cfg.msglen = msg_json_len;
|
||||
}else{
|
||||
msg_len = snprintf(NULL,0,"connection %s\naddress %s:%d\ntopic %s %s %d %s %s",name
|
||||
,remote_add
|
||||
,remote_port
|
||||
,pattern
|
||||
,direction
|
||||
,qos
|
||||
,local_prefix
|
||||
,remote_prefix);
|
||||
msg_len++;
|
||||
msg = (char*) malloc((size_t)msg_len);
|
||||
snprintf(msg,(size_t)msg_len,"connection %s\naddress %s:%d\ntopic %s %s %d %s %s",name
|
||||
,remote_add
|
||||
,remote_port
|
||||
,pattern
|
||||
,direction
|
||||
,qos
|
||||
,local_prefix
|
||||
,remote_prefix);
|
||||
cfg.message = strdup(msg);
|
||||
cfg.msglen = msg_len;
|
||||
}
|
||||
cfg.topic = strdup(topic);
|
||||
printf("Message New Bridge (%d):\n%s\n", cfg.msglen, cfg.message);
|
||||
}else if(cfg.bridgeType == BRIDGE_DEL){
|
||||
topic = strdup("$BRIDGE/del");
|
||||
name = cfg.bridge.name;
|
||||
if(cfg.bridge_conf_json == CONF_JSON){
|
||||
msg_json_len = snprintf(NULL,0,"{\"connection\":\"%s\"}", name);
|
||||
msg_json_len++;
|
||||
msg_json = (char*) malloc((size_t)msg_json_len);
|
||||
snprintf(msg_json,(size_t)msg_json_len,"{\"connection\":\"%s\"}", name);
|
||||
cfg.message = strdup(msg_json);
|
||||
cfg.msglen = msg_json_len;
|
||||
}else{
|
||||
msg_len = snprintf(NULL,0,"connection %s",name);
|
||||
msg_len++;
|
||||
msg = (char*) malloc((size_t)msg_len);
|
||||
snprintf(msg,(size_t)msg_len,"connection %s",name);
|
||||
cfg.message = strdup(msg);
|
||||
cfg.msglen = msg_len;
|
||||
}
|
||||
cfg.topic = strdup(topic);
|
||||
printf("Message Del Bridge (%d):\n%s\n", cfg.msglen, cfg.message);
|
||||
}
|
||||
|
||||
if(client_id_generate(&cfg)){
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
mosq = mosquitto_new(cfg.id, cfg.clean_session, ptrMosq);
|
||||
if(!mosq){
|
||||
switch(errno){
|
||||
case ENOMEM:
|
||||
err_printf(&cfg, "Error: Out of memory.\n");
|
||||
break;
|
||||
case EINVAL:
|
||||
err_printf(&cfg, "Error: Invalid id.\n");
|
||||
break;
|
||||
}
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
mosquitto_disconnect_v5_callback_set(mosq, my_disconnect_callback);
|
||||
if(!cfg.know_bridge_connection){
|
||||
mosquitto_connect_v5_callback_set(mosq, my_connect_callback_pub);
|
||||
mosquitto_publish_v5_callback_set(mosq, my_publish_callback);
|
||||
}
|
||||
|
||||
if(client_opts_set(mosq, &cfg)){
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
rc = client_connect(mosq, &cfg);
|
||||
if(rc){
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
#ifndef WIN32
|
||||
sigact.sa_handler = my_signal_handler;
|
||||
sigemptyset(&sigact.sa_mask);
|
||||
sigact.sa_flags = 0;
|
||||
|
||||
if(sigaction(SIGALRM, &sigact, NULL) == -1){
|
||||
perror("sigaction");
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if(sigaction(SIGTERM, &sigact, NULL) == -1){
|
||||
perror("sigaction");
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if(sigaction(SIGINT, &sigact, NULL) == -1){
|
||||
perror("sigaction");
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if(cfg.timeout){
|
||||
alarm(cfg.timeout);
|
||||
}
|
||||
#endif
|
||||
|
||||
if(cfg.know_bridge_connection){
|
||||
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
|
||||
mosquitto_connect_v5_callback_set(mosq, my_connect_callback_sub);
|
||||
mosquitto_message_v5_callback_set(mosq, my_message_callback);
|
||||
|
||||
mosquitto_loop_start(mosq);
|
||||
while (process_run) {
|
||||
pause();
|
||||
}
|
||||
mosquitto_disconnect(mosq);
|
||||
mosquitto_loop_stop(mosq,false);
|
||||
} else {
|
||||
rc = pub_loop(mosq);
|
||||
}
|
||||
|
||||
mosquitto_destroy(mosq);
|
||||
mosquitto_lib_cleanup();
|
||||
//client_config_cleanup(&cfg);
|
||||
if(cfg.know_bridge_connection){
|
||||
bridge_shared_cleanup();
|
||||
}
|
||||
|
||||
if(rc){
|
||||
err_printf(&cfg, "Error: %s\n", mosquitto_strerror(rc));
|
||||
}
|
||||
return rc;
|
||||
|
||||
cleanup:
|
||||
mosquitto_lib_cleanup();
|
||||
//client_config_cleanup(&cfg);
|
||||
if(cfg.know_bridge_connection){
|
||||
bridge_shared_cleanup();
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
@ -0,0 +1,215 @@
|
||||
'\" t
|
||||
.\" Title: mosquitto_bridge
|
||||
.\" Author: [see the "Author" section]
|
||||
.\" Generator: DocBook XSL Stylesheets v1.79.1 <http://docbook.sf.net/>
|
||||
.\" Date: 12/03/2020
|
||||
.\" Manual: Commands
|
||||
.\" Source: Mosquitto Project
|
||||
.\" Language: English
|
||||
.\"
|
||||
.TH "MOSQUITTO_BRIDGE" "1" "12/03/2020" "Mosquitto Project" "Commands"
|
||||
.\" -----------------------------------------------------------------
|
||||
.\" * Define some portability stuff
|
||||
.\" -----------------------------------------------------------------
|
||||
.\" ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
.\" http://bugs.debian.org/507673
|
||||
.\" http://lists.gnu.org/archive/html/groff/2009-02/msg00013.html
|
||||
.\" ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
.ie \n(.g .ds Aq \(aq
|
||||
.el .ds Aq '
|
||||
.\" -----------------------------------------------------------------
|
||||
.\" * set default formatting
|
||||
.\" -----------------------------------------------------------------
|
||||
.\" disable hyphenation
|
||||
.nh
|
||||
.\" disable justification (adjust text to left margin only)
|
||||
.ad l
|
||||
.\" -----------------------------------------------------------------
|
||||
.\" * MAIN CONTENT STARTS HERE *
|
||||
.\" -----------------------------------------------------------------
|
||||
.SH "NAME"
|
||||
mosquitto_bridge \- an MQTT client for creating, deleting and knowing bridges dynamically\&.
|
||||
.SH "SYNOPSIS"
|
||||
.HP \w'\fBmosquitto_bridge\fR\ 'u
|
||||
\fBmosquitto_bridge\fR [\-a | \-\-address] [\-c | \-\-connection] [\-d | \-\-del] [\-D | \-\-direction] [\-h | \-\-host] [\-k | \-\-know] [\-l | \-\-local] [\-n | \-\-new] [\-p | \-\-port] [\-P | \-\-pw] [\-q | \-\-qos] [\-r | \-\-remote] [\-R | \-\-remotePort] [\-u | \-\-username]
|
||||
.HP \w'\fBmosquitto_bridge\fR\ 'u
|
||||
\fBmosquitto_bridge\fR [\fB\-\-help\fR]
|
||||
.SH "DESCRIPTION"
|
||||
.PP
|
||||
\fBmosquitto_bridge\fR
|
||||
is a simple MQTT client that will create or delete a bridge dynamically for Mosquitto broker 1\&.4\&.90 or newer without shutdown or SIGHUP signal\&.
|
||||
.PP
|
||||
\fBmosquitto_bridge\fR
|
||||
also allows to know active bridges on a broker
|
||||
.SH "OPTIONS"
|
||||
.PP
|
||||
The options below may be given on the command line\&.
|
||||
.PP
|
||||
\fB\-a\fR, \fB\-\-address\fR
|
||||
.RS 4
|
||||
Define bridge address of bridge\&. Necessary only with
|
||||
\fB\-n\fR\&.
|
||||
.RE
|
||||
.PP
|
||||
\fB\-c\fR, \fB\-\-connection\fR
|
||||
.RS 4
|
||||
Define the connection name of the bridge\&.
|
||||
.RE
|
||||
.PP
|
||||
\fB\-d\fR, \fB\-\-del\fR
|
||||
.RS 4
|
||||
Delete a bridge which name is define by
|
||||
\fB\-c\fR\&. Can\*(Aqt be use with
|
||||
\fB\-n\fR\&.
|
||||
.RE
|
||||
.PP
|
||||
\fB\-D\fR, \fB\-\-directoin\fR
|
||||
.RS 4
|
||||
Define direction of the bridge [ in | out | both ]\&.
|
||||
.RE
|
||||
.PP
|
||||
\fB\-h\fR, \fB\-\-host\fR
|
||||
.RS 4
|
||||
Define the network host to connect to the local broker where the bridge will be create/delete or to know active bridges\&. Defaults to localhost\&.
|
||||
.RE
|
||||
.PP
|
||||
\fB\-k\fR, \fB\-\-know\fR
|
||||
.RS 4
|
||||
Know all bridges which are active in a broker\&. Can only be used with options other than
|
||||
\fB\-h\fR
|
||||
and
|
||||
\fB\-p\fR\&.
|
||||
.RE
|
||||
.PP
|
||||
\fB\-l\fR, \fB\-\-local\fR
|
||||
.RS 4
|
||||
Define the local prefix for the bridge configuration\&.
|
||||
.RE
|
||||
.PP
|
||||
\fB\-n\fR, \fB\-\-new\fR
|
||||
.RS 4
|
||||
Define to create a bridge\&.
|
||||
.RE
|
||||
.PP
|
||||
\fB\-p\fR, \fB\-\-port\fR
|
||||
.RS 4
|
||||
Define the network port to connect to the local broker where the bridge will be create/delete or to know active bridges\&. Defaults to 1883\&.
|
||||
.RE
|
||||
.PP
|
||||
\fB\-P\fR, \fB\-\-pw\fR
|
||||
.RS 4
|
||||
Provide a password to be used for authenticating with the broker\&. Using this argument without also specifying a username is invalid\&. This requires a broker that supports MQTT v3\&.1\&. See also the
|
||||
\fB\-\-username\fR
|
||||
option\&.
|
||||
.RE
|
||||
.PP
|
||||
\fB\-q\fR, \fB\-\-qos\fR
|
||||
.RS 4
|
||||
Specify the quality of service desired for the bridge between local broker and remote broker, from 0, 1 and 2\&. Defaults to 0\&. See
|
||||
\fBmqtt\fR(7)
|
||||
for more information on QoS\&.
|
||||
.RE
|
||||
.PP
|
||||
\fB\-r\fR, \fB\-\-remote\fR
|
||||
.RS 4
|
||||
Define the remote prefix for the bridge configuration\&.
|
||||
.RE
|
||||
.PP
|
||||
\fB\-R\fR, \fB\-\-remotePort\fR
|
||||
.RS 4
|
||||
Define the network port to connect to the remote broker\&. No default value\&.
|
||||
.RE
|
||||
.PP
|
||||
\fB\-u\fR, \fB\-\-username\fR
|
||||
.RS 4
|
||||
Provide a username to be used for authenticating with the broker\&. This requires a broker that supports MQTT v3\&.1\&. See also the
|
||||
\fB\-\-pw\fR
|
||||
argument\&.
|
||||
.RE
|
||||
.SH "EXAMPLES"
|
||||
.PP
|
||||
Note that these really are examples\&.
|
||||
.PP
|
||||
Three possiblites are available :
|
||||
.PP
|
||||
\- Create a new bridge from a local broker to a remote broker whith
|
||||
\-n
|
||||
and all necessary parameters of a bridge (connection, address, topic)\&.
|
||||
.PP
|
||||
\- Delete a bridge present on a local bridge with
|
||||
\-d
|
||||
and bridge connection name\&.
|
||||
.PP
|
||||
\- Know all bridges which are active on a local broker with
|
||||
\-k\&.
|
||||
.PP
|
||||
Creat a bridge on localhost and default port with another broker on localhost and 1884 port:
|
||||
.sp
|
||||
.RS 4
|
||||
.ie n \{\
|
||||
\h'-04'\(bu\h'+03'\c
|
||||
.\}
|
||||
.el \{\
|
||||
.sp -1
|
||||
.IP \(bu 2.3
|
||||
.\}
|
||||
mosquitto_bridge
|
||||
\-c
|
||||
testBridge
|
||||
\-a
|
||||
127\&.0\&.0\&.1
|
||||
\-R
|
||||
1884
|
||||
\-n
|
||||
\-t
|
||||
\e#
|
||||
\-q
|
||||
0
|
||||
\-l
|
||||
local/
|
||||
\-r
|
||||
remote/
|
||||
\-D
|
||||
both
|
||||
.RE
|
||||
.PP
|
||||
Delete a bridge on localhost and default port:
|
||||
.sp
|
||||
.RS 4
|
||||
.ie n \{\
|
||||
\h'-04'\(bu\h'+03'\c
|
||||
.\}
|
||||
.el \{\
|
||||
.sp -1
|
||||
.IP \(bu 2.3
|
||||
.\}
|
||||
mosquitto_bridge
|
||||
\-c
|
||||
testBridge
|
||||
\-d
|
||||
.RE
|
||||
.PP
|
||||
Know all active bridges on localhost and default port:
|
||||
.sp
|
||||
.RS 4
|
||||
.ie n \{\
|
||||
\h'-04'\(bu\h'+03'\c
|
||||
.\}
|
||||
.el \{\
|
||||
.sp -1
|
||||
.IP \(bu 2.3
|
||||
.\}
|
||||
mosquitto_bridge
|
||||
\-k
|
||||
.RE
|
||||
.SH "BUGS"
|
||||
.PP
|
||||
\fBmosquitto\fR
|
||||
bug information can be found at
|
||||
\m[blue]\fB\%https://github.com/eclipse/mosquitto/issues\fR\m[]
|
||||
.SH "SEE ALSO"
|
||||
\fBmqtt\fR(7), \fBmosquitto_pub\fR(1), \fBmosquitto_sub\fR(1), \fBmosquitto\fR(8), \fBlibmosquitto\fR(3), \fBmosquitto-tls\fR(7)
|
||||
.SH "AUTHOR"
|
||||
.PP
|
||||
Tifaifai Maupiti
|
||||
<tifaifai\&.maupiti@gmail\&.com>
|
@ -0,0 +1,5 @@
|
||||
.. title: mosquitto_bridge man page
|
||||
.. slug: mosquitto_bridge-1
|
||||
.. category: man
|
||||
.. type: man
|
||||
.. pretty_url: False
|
@ -0,0 +1,292 @@
|
||||
<?xml version='1.0' encoding='UTF-8'?>
|
||||
<?xml-stylesheet type="text/xsl" href="manpage.xsl"?>
|
||||
|
||||
<refentry xml:id="mosquitto_bridge" xmlns:xlink="http://www.w3.org/1999/xlink">
|
||||
<refmeta>
|
||||
<refentrytitle>mosquitto_bridge</refentrytitle>
|
||||
<manvolnum>1</manvolnum>
|
||||
<refmiscinfo class="source">Mosquitto Project</refmiscinfo>
|
||||
<refmiscinfo class="manual">Commands</refmiscinfo>
|
||||
</refmeta>
|
||||
|
||||
<refnamediv>
|
||||
<refname>mosquitto_bridge</refname>
|
||||
<refpurpose>an MQTT client for creating, deleting and knowing bridges dynamically.</refpurpose>
|
||||
</refnamediv>
|
||||
|
||||
<refsynopsisdiv>
|
||||
<cmdsynopsis>
|
||||
<command>mosquitto_bridge</command>
|
||||
<group>
|
||||
<arg choice='plain'>-a</arg>
|
||||
<arg choice='plain'>--address</arg>
|
||||
</group>
|
||||
<group>
|
||||
<arg choice='plain'>-c</arg>
|
||||
<arg choice='plain'>--connection</arg>
|
||||
</group>
|
||||
<group>
|
||||
<arg choice='plain'>-d</arg>
|
||||
<arg choice='plain'>--del</arg>
|
||||
</group>
|
||||
<group>
|
||||
<arg choice='plain'>-D</arg>
|
||||
<arg choice='plain'>--direction</arg>
|
||||
</group>
|
||||
<group>
|
||||
<arg choice='plain'>-h</arg>
|
||||
<arg choice='plain'>--host</arg>
|
||||
</group>
|
||||
<group>
|
||||
<arg choice='plain'>-k</arg>
|
||||
<arg choice='plain'>--know</arg>
|
||||
</group>
|
||||
<group>
|
||||
<arg choice='plain'>-l</arg>
|
||||
<arg choice='plain'>--local</arg>
|
||||
</group>
|
||||
<group>
|
||||
<arg choice='plain'>-n</arg>
|
||||
<arg choice='plain'>--new</arg>
|
||||
</group>
|
||||
<group>
|
||||
<arg choice='plain'>-p</arg>
|
||||
<arg choice='plain'>--port</arg>
|
||||
</group>
|
||||
<group>
|
||||
<arg choice='plain'>-P</arg>
|
||||
<arg choice='plain'>--pw</arg>
|
||||
</group>
|
||||
<group>
|
||||
<arg choice='plain'>-q</arg>
|
||||
<arg choice='plain'>--qos</arg>
|
||||
</group>
|
||||
<group>
|
||||
<arg choice='plain'>-r</arg>
|
||||
<arg choice='plain'>--remote</arg>
|
||||
</group>
|
||||
<group>
|
||||
<arg choice='plain'>-R</arg>
|
||||
<arg choice='plain'>--remotePort</arg>
|
||||
</group>
|
||||
<group>
|
||||
<arg choice='plain'>-u</arg>
|
||||
<arg choice='plain'>--username</arg>
|
||||
</group>
|
||||
</cmdsynopsis>
|
||||
<cmdsynopsis>
|
||||
<command>mosquitto_bridge</command>
|
||||
<group choice='plain'>
|
||||
<arg><option>--help</option></arg>
|
||||
</group>
|
||||
</cmdsynopsis>
|
||||
</refsynopsisdiv>
|
||||
|
||||
<refsect1>
|
||||
<title>Description</title>
|
||||
<para><command>mosquitto_bridge</command> is a simple MQTT
|
||||
client that will create or delete a bridge dynamically for Mosquitto broker
|
||||
1.4.90 or newer without shutdown or SIGHUP signal.</para>
|
||||
<para><command>mosquitto_bridge</command> also allows to know active bridges on a broker</para>
|
||||
</refsect1>
|
||||
|
||||
<refsect1>
|
||||
<title>Options</title>
|
||||
<para>The options below may be given on the command line.</para>
|
||||
<variablelist>
|
||||
<varlistentry>
|
||||
<term><option>-a</option></term>
|
||||
<term><option>--address</option></term>
|
||||
<listitem>
|
||||
<para>Define bridge address of bridge.
|
||||
Necessary only with <option>-n</option>.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
<varlistentry>
|
||||
<term><option>-c</option></term>
|
||||
<term><option>--connection</option></term>
|
||||
<listitem>
|
||||
<para>Define the connection name of the bridge.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
<varlistentry>
|
||||
<term><option>-d</option></term>
|
||||
<term><option>--del</option></term>
|
||||
<listitem>
|
||||
<para>Delete a bridge which name is define by
|
||||
<option>-c</option>. Can't be use with <option>-n</option>.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
<varlistentry>
|
||||
<term><option>-D</option></term>
|
||||
<term><option>--directoin</option></term>
|
||||
<listitem>
|
||||
<para>Define direction of the bridge [ in | out | both ].</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
<varlistentry>
|
||||
<term><option>-h</option></term>
|
||||
<term><option>--host</option></term>
|
||||
<listitem>
|
||||
<para>Define the network host to connect to the local
|
||||
broker where the bridge will be create/delete or to
|
||||
know active bridges. Defaults to localhost.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
<varlistentry>
|
||||
<term><option>-k</option></term>
|
||||
<term><option>--know</option></term>
|
||||
<listitem>
|
||||
<para>Know all bridges which are active in a broker.
|
||||
Can only be used with options other than
|
||||
<option>-h</option> and <option>-p</option>.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
<varlistentry>
|
||||
<term><option>-l</option></term>
|
||||
<term><option>--local</option></term>
|
||||
<listitem>
|
||||
<para>Define the local prefix for the bridge configuration.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
<varlistentry>
|
||||
<term><option>-n</option></term>
|
||||
<term><option>--new</option></term>
|
||||
<listitem>
|
||||
<para>Define to create a bridge.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
<varlistentry>
|
||||
<term><option>-p</option></term>
|
||||
<term><option>--port</option></term>
|
||||
<listitem>
|
||||
<para>Define the network port to connect to the local
|
||||
broker where the bridge will be create/delete or to
|
||||
know active bridges. Defaults to 1883.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
<varlistentry>
|
||||
<term><option>-P</option></term>
|
||||
<term><option>--pw</option></term>
|
||||
<listitem>
|
||||
<para>Provide a password to be used for authenticating with
|
||||
the broker. Using this argument without also specifying a
|
||||
username is invalid. This requires a broker that supports
|
||||
MQTT v3.1. See also the <option>--username</option> option.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
<varlistentry>
|
||||
<term><option>-q</option></term>
|
||||
<term><option>--qos</option></term>
|
||||
<listitem>
|
||||
<para>Specify the quality of service desired for the
|
||||
bridge between local broker and remote broker, from 0, 1 and 2. Defaults to 0. See
|
||||
<citerefentry><refentrytitle>mqtt</refentrytitle><manvolnum>7</manvolnum></citerefentry>
|
||||
for more information on QoS.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
<varlistentry>
|
||||
<term><option>-r</option></term>
|
||||
<term><option>--remote</option></term>
|
||||
<listitem>
|
||||
<para>Define the remote prefix for the bridge configuration.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
<varlistentry>
|
||||
<term><option>-R</option></term>
|
||||
<term><option>--remotePort</option></term>
|
||||
<listitem>
|
||||
<para>Define the network port to connect to the remote broker. No default value.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
<varlistentry>
|
||||
<term><option>-u</option></term>
|
||||
<term><option>--username</option></term>
|
||||
<listitem>
|
||||
<para>Provide a username to be used for authenticating with
|
||||
the broker. This requires a broker that supports MQTT v3.1.
|
||||
See also the <option>--pw</option> argument.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
</variablelist>
|
||||
</refsect1>
|
||||
|
||||
<refsect1>
|
||||
<title>Examples</title>
|
||||
<para>Note that these really are examples.</para>
|
||||
<para>Three possiblites are available :</para>
|
||||
<para>- Create a new bridge from a local broker to a remote broker whith
|
||||
<literal>-n</literal> and all necessary parameters of a bridge (connection, address, topic).</para>
|
||||
<para>- Delete a bridge present on a local bridge with <literal>-d</literal> and bridge connection name.</para>
|
||||
<para>- Know all bridges which are active on a local broker with <literal>-k</literal>.</para>
|
||||
<para>Creat a bridge on localhost and default port with another broker on localhost and 1884 port:</para>
|
||||
<itemizedlist mark="circle">
|
||||
<listitem><para>mosquitto_bridge <literal>-c</literal> testBridge
|
||||
<literal>-a</literal> 127.0.0.1 <literal>-R</literal> 1884 <literal>-n</literal>
|
||||
<literal>-t</literal> \# <literal>-q</literal> 0 <literal>-l</literal> local/
|
||||
<literal>-r</literal> remote/ <literal>-D</literal> both</para></listitem>
|
||||
</itemizedlist>
|
||||
<para>Delete a bridge on localhost and default port:</para>
|
||||
<itemizedlist mark="circle">
|
||||
<listitem><para>mosquitto_bridge <literal>-c</literal> testBridge
|
||||
<literal>-d</literal></para></listitem>
|
||||
</itemizedlist>
|
||||
<para>Know all active bridges on localhost and default port:</para>
|
||||
<itemizedlist mark="circle">
|
||||
<listitem><para>mosquitto_bridge <literal>-k</literal></para></listitem>
|
||||
</itemizedlist>
|
||||
</refsect1>
|
||||
|
||||
<refsect1>
|
||||
<title>Bugs</title>
|
||||
<para><command>mosquitto</command> bug information can be found at
|
||||
<ulink url="https://github.com/eclipse/mosquitto/issues"/></para>
|
||||
</refsect1>
|
||||
|
||||
<refsect1>
|
||||
<title>See Also</title>
|
||||
<simplelist type="inline">
|
||||
<member>
|
||||
<citerefentry>
|
||||
<refentrytitle><link xlink:href="mqtt-7.html">mqtt</link></refentrytitle>
|
||||
<manvolnum>7</manvolnum>
|
||||
</citerefentry>
|
||||
</member>
|
||||
<member>
|
||||
<citerefentry>
|
||||
<refentrytitle><link xlink:href="mosquitto_pub-1.html">mosquitto_pub</link></refentrytitle>
|
||||
<manvolnum>1</manvolnum>
|
||||
</citerefentry>
|
||||
</member>
|
||||
<member>
|
||||
<citerefentry>
|
||||
<refentrytitle><link xlink:href="mosquitto_sub-1.html">mosquitto_sub</link></refentrytitle>
|
||||
<manvolnum>1</manvolnum>
|
||||
</citerefentry>
|
||||
</member>
|
||||
<member>
|
||||
<citerefentry>
|
||||
<refentrytitle><link xlink:href="mosquitto-8.html">mosquitto</link></refentrytitle>
|
||||
<manvolnum>8</manvolnum>
|
||||
</citerefentry>
|
||||
</member>
|
||||
<member>
|
||||
<citerefentry>
|
||||
<refentrytitle><link xlink:href="libmosquitto-3.html">libmosquitto</link></refentrytitle>
|
||||
<manvolnum>3</manvolnum>
|
||||
</citerefentry>
|
||||
</member>
|
||||
<member>
|
||||
<citerefentry>
|
||||
<refentrytitle><link xlink:href="mosquitto-tls-7.html">mosquitto-tls</link></refentrytitle>
|
||||
<manvolnum>7</manvolnum>
|
||||
</citerefentry>
|
||||
</member>
|
||||
</simplelist>
|
||||
</refsect1>
|
||||
|
||||
<refsect1>
|
||||
<title>Author</title>
|
||||
<para>Tifaifai Maupiti <email>tifaifai.maupiti@gmail.com</email></para>
|
||||
</refsect1>
|
||||
</refentry>
|
@ -0,0 +1,904 @@
|
||||
/*
|
||||
Copyright (c) 2017-2020 Tifaifai Maupiti <tifaifai.maupiti@gmail.com>
|
||||
|
||||
All rights reserved. This program and the accompanying materials
|
||||
are made available under the terms of the Eclipse Public License v1.0
|
||||
and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
|
||||
Contributor:
|
||||
Tifaifai Maupiti - Initial implementation and documentation.
|
||||
*/
|
||||
#define _POSIX_C_SOURCE 200809L
|
||||
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
|
||||
#ifdef WITH_EPOLL
|
||||
#include <sys/epoll.h>
|
||||
#endif
|
||||
|
||||
#ifdef WITH_CJSON
|
||||
# include <cjson/cJSON.h>
|
||||
#endif
|
||||
|
||||
#ifndef WIN32
|
||||
#include <unistd.h>
|
||||
#include <strings.h>
|
||||
#else
|
||||
#include <process.h>
|
||||
#include <winsock2.h>
|
||||
#define snprintf sprintf_s
|
||||
#define strncasecmp _strnicmp
|
||||
#endif
|
||||
|
||||
#include "mosquitto_broker_internal.h"
|
||||
#include "mqtt_protocol.h"
|
||||
#include "memory_mosq.h"
|
||||
#include "read_handle.h"
|
||||
#include "send_mosq.h"
|
||||
#include "util_mosq.h"
|
||||
|
||||
static int config__check(struct mosquitto__config *config);
|
||||
|
||||
int bridge__dynamic_analyse(struct mosquitto_db *db, char *topic, void* payload, uint32_t payloadlen)
|
||||
{
|
||||
int rc;
|
||||
int *index;
|
||||
|
||||
struct mosquitto__config config;
|
||||
config__init(&config);
|
||||
|
||||
index = (int*) mosquitto__malloc(sizeof(int));
|
||||
*index = -1;
|
||||
|
||||
if(strncmp("$BRIDGE/new",topic, 11)==0){
|
||||
rc = bridge__dynamic_parse_payload_new_json(db, payload, &config);
|
||||
if(rc != 0){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Unable to parse PUBLISH for bridge dynamic.");
|
||||
mosquitto__free(index);
|
||||
return MOSQ_ERR_BRIDGE_DYNA;
|
||||
}
|
||||
rc = config__check(&config);
|
||||
if(rc != 0){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Unable to parse PUBLISH.");
|
||||
mosquitto__free(index);
|
||||
return MOSQ_ERR_BRIDGE_DYNA;
|
||||
}
|
||||
bridge__new(&(config.bridges[config.bridge_count-1]));
|
||||
if(rc != 0){
|
||||
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to connect to bridge %s.",
|
||||
config.bridges[config.bridge_count-1].name);
|
||||
mosquitto__free(index);
|
||||
return MOSQ_ERR_BRIDGE_DYNA;
|
||||
}else{
|
||||
log__printf(NULL, MOSQ_LOG_WARNING, "Information : Start connection with bridge %s.",
|
||||
config.bridges[config.bridge_count-1].name);
|
||||
mux__add_in(db->bridges[db->bridge_count-1]);
|
||||
mux__add_out(db->bridges[db->bridge_count-1]);
|
||||
}
|
||||
}else if(strncmp("$BRIDGE/del", topic, 11)==0){
|
||||
rc = bridge__dynamic_parse_payload_del_json(payload,db,index);
|
||||
if(rc != 0){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Unable to parse PUBLISH for bridge dynamic.");
|
||||
mosquitto__free(index);
|
||||
return MOSQ_ERR_BRIDGE_DYNA;
|
||||
}
|
||||
|
||||
if(*index == -1){
|
||||
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unknow bridge name.");
|
||||
mosquitto__free(index);
|
||||
return MOSQ_ERR_BRIDGE_DYNA;
|
||||
}
|
||||
|
||||
if(bridge__del(db, *index)){
|
||||
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to remove bridge %s.",
|
||||
config.bridges[*index].name);
|
||||
mosquitto__free(index);
|
||||
return MOSQ_ERR_BRIDGE_DYNA;
|
||||
}
|
||||
}
|
||||
|
||||
mosquitto__free(index);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int bridge__dynamic_parse_payload_new_json(struct mosquitto_db *db, void* payload, struct mosquitto__config *config)
|
||||
{
|
||||
int i;
|
||||
int len;
|
||||
struct mosquitto__bridge *cur_bridge = NULL;
|
||||
struct mosquitto__bridge_topic *cur_topic;
|
||||
|
||||
#ifndef WITH_CJSON
|
||||
return bridge__dynamic_parse_payload_new(db, payload, config);
|
||||
#endif
|
||||
|
||||
cJSON *message_json = cJSON_Parse(payload);
|
||||
if(message_json == NULL){
|
||||
const char *error_ptr = cJSON_GetErrorPtr();
|
||||
if(error_ptr != NULL){
|
||||
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to parse JSON Message for bridge dynamic. Maybe normal message configuration. %s", error_ptr);
|
||||
}
|
||||
cJSON_Delete(message_json);
|
||||
return bridge__dynamic_parse_payload_new(db, payload, config);
|
||||
}
|
||||
|
||||
const cJSON *bridges_json = NULL;
|
||||
const cJSON *addresses_json = NULL;
|
||||
int bridges_count_json = 0;
|
||||
int addresses_count_json = 0;
|
||||
|
||||
bridges_json = cJSON_GetObjectItemCaseSensitive(message_json, "bridges");
|
||||
if(!cJSON_IsArray(bridges_json)) {
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
bridges_count_json = cJSON_GetArraySize(bridges_json);
|
||||
log__printf(NULL, MOSQ_LOG_DEBUG, "Information : %d bridge(s) dynamic.", bridges_count_json);
|
||||
if(bridges_count_json == 0){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: None Bridge in configuration.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
const cJSON *bridge_json = NULL;
|
||||
// Actually, just one bridge defined in configuration bridges. Work in progress...
|
||||
int index = 0;
|
||||
bridge_json = cJSON_GetArrayItem(bridges_json, index);
|
||||
|
||||
const cJSON *connection_json = NULL;
|
||||
const cJSON *topic_json = NULL;
|
||||
const cJSON *direction_json = NULL;
|
||||
const cJSON *qos_json= NULL;
|
||||
const cJSON *local_prefix_json = NULL;
|
||||
const cJSON *remote_prefix_json = NULL;
|
||||
const cJSON *remote_username = NULL;
|
||||
const cJSON *try_private = NULL;
|
||||
const cJSON *notification_topic = NULL;
|
||||
const cJSON *remote_clientid = NULL;
|
||||
|
||||
connection_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "connection");
|
||||
if(cJSON_IsString(connection_json) && (connection_json->valuestring != NULL)) {
|
||||
/* Check for existing bridge name. */
|
||||
for(i=0; i<db->bridge_count; i++){
|
||||
if(!strcmp(db->bridges[i]->bridge->name, connection_json->valuestring)){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Duplicate bridge name \"%s\".", connection_json->valuestring);
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
}
|
||||
config->bridge_count++;
|
||||
config->bridges = mosquitto__realloc(config->bridges, (size_t)config->bridge_count*sizeof(struct mosquitto__bridge));
|
||||
if(!config->bridges){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
cur_bridge = &(config->bridges[config->bridge_count-1]);
|
||||
memset(cur_bridge, 0, sizeof(struct mosquitto__bridge));
|
||||
cur_bridge->name = mosquitto__strdup(connection_json->valuestring);
|
||||
if(!cur_bridge->name){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
cur_bridge->keepalive = 60;
|
||||
cur_bridge->notifications = true;
|
||||
cur_bridge->notifications_local_only = false;
|
||||
cur_bridge->start_type = bst_automatic;
|
||||
cur_bridge->idle_timeout = 60;
|
||||
cur_bridge->restart_timeout = 0;
|
||||
cur_bridge->backoff_base = 5;
|
||||
cur_bridge->backoff_cap = 30;
|
||||
cur_bridge->threshold = 10;
|
||||
cur_bridge->try_private = true;
|
||||
cur_bridge->attempt_unsubscribe = true;
|
||||
cur_bridge->protocol_version = mosq_p_mqtt311;
|
||||
cur_bridge->primary_retry_sock = INVALID_SOCKET;
|
||||
}
|
||||
|
||||
addresses_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "addresses");
|
||||
if(!cJSON_IsArray(addresses_json)) {
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration (addresses).");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
addresses_count_json = cJSON_GetArraySize(addresses_json);
|
||||
if(addresses_count_json == 0){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: None address in bridge configuration.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
const cJSON *addressItem_json = NULL;
|
||||
cJSON_ArrayForEach(addressItem_json, addresses_json) {
|
||||
cJSON *address_json = cJSON_GetObjectItemCaseSensitive(addressItem_json, "address");
|
||||
cJSON *port_json = cJSON_GetObjectItemCaseSensitive(addressItem_json, "port");
|
||||
|
||||
if(cJSON_IsString(address_json) && (address_json->valuestring != NULL)) {
|
||||
if(!cur_bridge || cur_bridge->addresses){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
cur_bridge->address_count++;
|
||||
cur_bridge->addresses = mosquitto__realloc(cur_bridge->addresses, (size_t)cur_bridge->address_count*sizeof(struct bridge_address));
|
||||
if(!cur_bridge->addresses){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
cur_bridge->addresses[cur_bridge->address_count-1].address = mosquitto__strdup(address_json->valuestring);
|
||||
}
|
||||
|
||||
if(cJSON_IsNumber(port_json)){
|
||||
if(port_json->valueint < 1 || port_json->valueint > UINT16_MAX){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid port value (%d).", port_json->valueint);
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
cur_bridge->addresses[cur_bridge->address_count-1].port = (uint16_t)port_json->valueint;
|
||||
}
|
||||
}
|
||||
|
||||
topic_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "topic");
|
||||
if(cJSON_IsString(topic_json) && (topic_json->valuestring != NULL)) {
|
||||
cur_bridge->topic_count++;
|
||||
cur_bridge->topics = mosquitto__realloc(cur_bridge->topics,
|
||||
sizeof(struct mosquitto__bridge_topic)*(size_t)cur_bridge->topic_count);
|
||||
if(!cur_bridge->topics){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
cur_topic = &cur_bridge->topics[cur_bridge->topic_count-1];
|
||||
if(!strcmp(topic_json->valuestring, "\"\"")){
|
||||
cur_topic->topic = NULL;
|
||||
}else{
|
||||
cur_topic->topic = mosquitto__strdup(topic_json->valuestring);
|
||||
if(!cur_topic->topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
cur_topic->direction = bd_out;
|
||||
cur_topic->qos = 0;
|
||||
cur_topic->local_prefix = NULL;
|
||||
cur_topic->remote_prefix = NULL;
|
||||
}else{
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty topic value in configuration.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
direction_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "direction");
|
||||
if(cJSON_IsString(direction_json) && (direction_json->valuestring != NULL)) {
|
||||
if(!strcasecmp(direction_json->valuestring, "out")){
|
||||
cur_topic->direction = bd_out;
|
||||
}else if(!strcasecmp(direction_json->valuestring, "in")){
|
||||
cur_topic->direction = bd_in;
|
||||
}else if(!strcasecmp(direction_json->valuestring, "both")){
|
||||
cur_topic->direction = bd_both;
|
||||
}else{
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic direction '%s'.", direction_json->valuestring);
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
}
|
||||
qos_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "qos");
|
||||
if(cJSON_IsNumber(qos_json)){
|
||||
if(qos_json->valueint < 0 || qos_json->valueint > 2){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge QoS level '%d'.", qos_json->valueint);
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
cur_topic->qos = (uint8_t)qos_json->valueint;
|
||||
}
|
||||
local_prefix_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "local_prefix");
|
||||
if(cJSON_IsString(local_prefix_json) && (local_prefix_json->valuestring != NULL)) {
|
||||
cur_bridge->topic_remapping = true;
|
||||
if(!strcmp(local_prefix_json->valuestring, "\"\"")){
|
||||
cur_topic->local_prefix = NULL;
|
||||
}else{
|
||||
if(mosquitto_pub_topic_check(local_prefix_json->valuestring) != MOSQ_ERR_SUCCESS){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic local prefix '%s'.", local_prefix_json->valuestring);
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
cur_topic->local_prefix = mosquitto__strdup(local_prefix_json->valuestring);
|
||||
if(!cur_topic->local_prefix){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
}
|
||||
remote_prefix_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "remote_prefix");
|
||||
if(cJSON_IsString(remote_prefix_json) && (remote_prefix_json->valuestring != NULL)) {
|
||||
if(!strcmp(remote_prefix_json->valuestring, "\"\"")){
|
||||
cur_topic->remote_prefix = NULL;
|
||||
}else{
|
||||
if(mosquitto_pub_topic_check(remote_prefix_json->valuestring) != MOSQ_ERR_SUCCESS){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic remote prefix '%s'.", remote_prefix_json->valuestring);
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
cur_topic->remote_prefix = mosquitto__strdup(remote_prefix_json->valuestring);
|
||||
if(!cur_topic->remote_prefix){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
}
|
||||
remote_username = cJSON_GetObjectItemCaseSensitive(bridge_json, "remote_username");
|
||||
if(cJSON_IsString(remote_username) && (remote_username->valuestring != NULL)) {
|
||||
if(!strcmp(remote_username->valuestring, "\"\"")){
|
||||
cur_bridge->remote_username = NULL;
|
||||
}else{
|
||||
cur_bridge->remote_username = mosquitto__strdup(remote_username->valuestring);
|
||||
if(!cur_bridge->remote_username){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
}
|
||||
try_private = cJSON_GetObjectItemCaseSensitive(bridge_json, "try_private");
|
||||
if(cJSON_IsBool(try_private)){
|
||||
cur_bridge->try_private = cJSON_IsTrue(try_private) ? true : false;
|
||||
}
|
||||
notification_topic = cJSON_GetObjectItemCaseSensitive(bridge_json, "notification_topic");
|
||||
if(cJSON_IsString(notification_topic) && (notification_topic->valuestring != NULL)) {
|
||||
if(!strcmp(notification_topic->valuestring, "\"\"")){
|
||||
cur_bridge->notification_topic = NULL;
|
||||
}else{
|
||||
cur_bridge->notification_topic = mosquitto__strdup(notification_topic->valuestring);
|
||||
if(!cur_bridge->notification_topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
}
|
||||
remote_clientid = cJSON_GetObjectItemCaseSensitive(bridge_json, "remote_clientid");
|
||||
if(cJSON_IsString(remote_clientid) && (remote_clientid->valuestring != NULL)) {
|
||||
if(!strcmp(remote_clientid->valuestring, "\"\"")){
|
||||
cur_bridge->remote_clientid = NULL;
|
||||
}else{
|
||||
cur_bridge->remote_clientid = mosquitto__strdup(remote_clientid->valuestring);
|
||||
if(!cur_bridge->remote_clientid){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Last verification
|
||||
if(cur_bridge->address_count == 0){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty address value in configuration.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
if(config->bridge_count == 0){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
if(cur_topic->topic == NULL &&
|
||||
(cur_topic->local_prefix == NULL || cur_topic->remote_prefix == NULL)){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge remapping.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
|
||||
if(cur_topic->local_prefix){
|
||||
if(cur_topic->topic){
|
||||
len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->local_prefix)+1;
|
||||
cur_topic->local_topic = mosquitto__malloc((size_t)len+1);
|
||||
if(!cur_topic->local_topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
snprintf(cur_topic->local_topic, (size_t)len+1, "%s%s", cur_topic->local_prefix, cur_topic->topic);
|
||||
cur_topic->local_topic[len] = '\0';
|
||||
}else{
|
||||
cur_topic->local_topic = mosquitto__strdup(cur_topic->local_prefix);
|
||||
if(!cur_topic->local_topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
}else{
|
||||
cur_topic->local_topic = mosquitto__strdup(cur_topic->topic);
|
||||
if(!cur_topic->local_topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
|
||||
if(cur_topic->remote_prefix){
|
||||
if(cur_topic->topic){
|
||||
len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->remote_prefix)+1;
|
||||
cur_topic->remote_topic = mosquitto__malloc((size_t)len+1);
|
||||
if(!cur_topic->remote_topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
snprintf(cur_topic->remote_topic, (size_t)len, "%s%s", cur_topic->remote_prefix, cur_topic->topic);
|
||||
cur_topic->remote_topic[len] = '\0';
|
||||
}else{
|
||||
cur_topic->remote_topic = mosquitto__strdup(cur_topic->remote_prefix);
|
||||
if(!cur_topic->remote_topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
}else{
|
||||
cur_topic->remote_topic = mosquitto__strdup(cur_topic->topic);
|
||||
if(!cur_topic->remote_topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
cJSON_Delete(message_json);
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
int bridge__dynamic_parse_payload_new(struct mosquitto_db *db, void* payload, struct mosquitto__config *config)
|
||||
{
|
||||
char *buf = NULL;
|
||||
char *token;
|
||||
int tmp_int;
|
||||
char *saveptr = NULL;
|
||||
struct mosquitto__bridge *cur_bridge = NULL;
|
||||
struct mosquitto__bridge_topic *cur_topic;
|
||||
|
||||
char *address;
|
||||
int i;
|
||||
int len;
|
||||
int nb_param = 0;
|
||||
|
||||
if(!payload) return MOSQ_ERR_INVAL;
|
||||
|
||||
buf = strtok(payload, "\n");
|
||||
|
||||
while(buf) {
|
||||
if(buf[0] != '#' && buf[0] != 10 && buf[0] != 13){
|
||||
while(buf[strlen(buf)-1] == 10 || buf[strlen(buf)-1] == 13){
|
||||
buf[strlen(buf)-1] = 0;
|
||||
}
|
||||
token = strtok_r(buf, " ", &saveptr);
|
||||
|
||||
if(token)
|
||||
{
|
||||
if(!strcmp(token, "address") || !strcmp(token, "addresses"))
|
||||
{
|
||||
nb_param ++;
|
||||
if(!cur_bridge || cur_bridge->addresses){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
while((token = strtok_r(NULL, " ", &saveptr))){
|
||||
cur_bridge->address_count++;
|
||||
cur_bridge->addresses = mosquitto__realloc(cur_bridge->addresses, sizeof(struct bridge_address)*(size_t)cur_bridge->address_count);
|
||||
if(!cur_bridge->addresses){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
cur_bridge->addresses[cur_bridge->address_count-1].address = token;
|
||||
}
|
||||
for(i=0; i<cur_bridge->address_count; i++){
|
||||
address = strtok_r(cur_bridge->addresses[i].address, ":", &saveptr);
|
||||
if(address){
|
||||
token = strtok_r(NULL, ":", &saveptr);
|
||||
if(token){
|
||||
tmp_int = atoi(token);
|
||||
if(tmp_int < 1 || tmp_int > UINT16_MAX){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid port value (%d).", tmp_int);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
cur_bridge->addresses[i].port = (uint16_t)tmp_int;
|
||||
}else{
|
||||
cur_bridge->addresses[i].port = 1883;
|
||||
}
|
||||
cur_bridge->addresses[i].address = mosquitto__strdup(address);
|
||||
//_conf_attempt_resolve(address, "bridge address", MOSQ_LOG_WARNING, "Warning");
|
||||
}
|
||||
}
|
||||
if(cur_bridge->address_count == 0){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty address value in configuration.");
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
}
|
||||
else if(!strcmp(token, "connection"))
|
||||
{
|
||||
nb_param ++;
|
||||
//if(reload) continue; // FIXME
|
||||
token = strtok_r(NULL, " ", &saveptr);
|
||||
if(token){
|
||||
/* Check for existing bridge name. */
|
||||
for(i=0; i<db->bridge_count; i++){
|
||||
if(!strcmp(db->bridges[i]->bridge->name, token)){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Duplicate bridge name \"%s\".", token);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
}
|
||||
|
||||
config->bridge_count++;
|
||||
config->bridges = mosquitto__realloc(config->bridges, (size_t)config->bridge_count*sizeof(struct mosquitto__bridge));
|
||||
if(!config->bridges){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
cur_bridge = &(config->bridges[config->bridge_count-1]);
|
||||
memset(cur_bridge, 0, sizeof(struct mosquitto__bridge));
|
||||
cur_bridge->name = mosquitto__strdup(token);
|
||||
if(!cur_bridge->name){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
cur_bridge->keepalive = 60;
|
||||
cur_bridge->notifications = true;
|
||||
cur_bridge->notifications_local_only = false;
|
||||
cur_bridge->start_type = bst_automatic;
|
||||
cur_bridge->idle_timeout = 60;
|
||||
cur_bridge->restart_timeout = 0;
|
||||
cur_bridge->backoff_base = 5;
|
||||
cur_bridge->backoff_cap = 30;
|
||||
cur_bridge->threshold = 10;
|
||||
cur_bridge->try_private = true;
|
||||
cur_bridge->attempt_unsubscribe = true;
|
||||
cur_bridge->protocol_version = mosq_p_mqtt311;
|
||||
cur_bridge->primary_retry_sock = INVALID_SOCKET;
|
||||
}else{
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
}
|
||||
else if(!strcmp(token, "try_private"))
|
||||
{
|
||||
nb_param ++;
|
||||
if(!cur_bridge){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
token = strtok_r(NULL, " ", &saveptr);
|
||||
if(token){
|
||||
if(!strcmp(token,"false")){
|
||||
cur_bridge->try_private = false;
|
||||
}else if(strcmp(token,"true")){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if(!strcmp(token, "notification_topic"))
|
||||
{
|
||||
nb_param ++;
|
||||
if(!cur_bridge){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
token = strtok_r(NULL, " ", &saveptr);
|
||||
if(token){
|
||||
cur_bridge->notification_topic = mosquitto__strdup(token);
|
||||
}
|
||||
}
|
||||
else if(!strcmp(token, "remote_username"))
|
||||
{
|
||||
log__printf(NULL, MOSQ_LOG_INFO, "Found remote_username");
|
||||
nb_param ++;
|
||||
if(!cur_bridge){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
token = strtok_r(NULL, " ", &saveptr);
|
||||
if(token){
|
||||
cur_bridge->remote_username = mosquitto__strdup(token);
|
||||
}
|
||||
}
|
||||
else if(!strcmp(token, "remote_clientid"))
|
||||
{
|
||||
nb_param ++;
|
||||
if(!cur_bridge){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
token = strtok_r(NULL, " ", &saveptr);
|
||||
if(token){
|
||||
cur_bridge->remote_clientid = mosquitto__strdup(token);
|
||||
}
|
||||
}
|
||||
else if(!strcmp(token, "topic"))
|
||||
{
|
||||
nb_param ++;
|
||||
if(!cur_bridge){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
token = strtok_r(NULL, " ", &saveptr);
|
||||
if(token){
|
||||
cur_bridge->topic_count++;
|
||||
cur_bridge->topics = mosquitto__realloc(cur_bridge->topics,
|
||||
sizeof(struct mosquitto__bridge_topic)*(size_t)cur_bridge->topic_count);
|
||||
if(!cur_bridge->topics){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
cur_topic = &cur_bridge->topics[cur_bridge->topic_count-1];
|
||||
if(!strcmp(token, "\"\"")){
|
||||
cur_topic->topic = NULL;
|
||||
}else{
|
||||
cur_topic->topic = mosquitto__strdup(token);
|
||||
if(!cur_topic->topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
cur_topic->direction = bd_out;
|
||||
cur_topic->qos = 0;
|
||||
cur_topic->local_prefix = NULL;
|
||||
cur_topic->remote_prefix = NULL;
|
||||
}else{
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty topic value in configuration.");
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
token = strtok_r(NULL, " ", &saveptr);
|
||||
if(token){
|
||||
if(!strcasecmp(token, "out")){
|
||||
cur_topic->direction = bd_out;
|
||||
}else if(!strcasecmp(token, "in")){
|
||||
cur_topic->direction = bd_in;
|
||||
}else if(!strcasecmp(token, "both")){
|
||||
cur_topic->direction = bd_both;
|
||||
}else{
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic direction '%s'.", token);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
token = strtok_r(NULL, " ", &saveptr);
|
||||
if(token){
|
||||
cur_topic->qos = (uint8_t)atoi(token);
|
||||
if(cur_topic->qos < 0 || cur_topic->qos > 2){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge QoS level '%s'.", token);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
|
||||
token = strtok_r(NULL, " ", &saveptr);
|
||||
if(token){
|
||||
cur_bridge->topic_remapping = true;
|
||||
if(!strcmp(token, "\"\"")){
|
||||
cur_topic->local_prefix = NULL;
|
||||
}else{
|
||||
if(mosquitto_pub_topic_check(token) != MOSQ_ERR_SUCCESS){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic local prefix '%s'.", token);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
cur_topic->local_prefix = mosquitto__strdup(token);
|
||||
if(!cur_topic->local_prefix){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
|
||||
token = strtok_r(NULL, " ", &saveptr);
|
||||
if(token){
|
||||
if(!strcmp(token, "\"\"")){
|
||||
cur_topic->remote_prefix = NULL;
|
||||
}else{
|
||||
if(mosquitto_pub_topic_check(token) != MOSQ_ERR_SUCCESS){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic remote prefix '%s'.", token);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
cur_topic->remote_prefix = mosquitto__strdup(token);
|
||||
if(!cur_topic->remote_prefix){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if(cur_topic->topic == NULL &&
|
||||
(cur_topic->local_prefix == NULL || cur_topic->remote_prefix == NULL)){
|
||||
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge remapping.");
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
if(cur_topic->local_prefix){
|
||||
if(cur_topic->topic){
|
||||
len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->local_prefix)+1;
|
||||
cur_topic->local_topic = mosquitto__malloc((size_t)len+1);
|
||||
if(!cur_topic->local_topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
snprintf(cur_topic->local_topic, (size_t)len+1, "%s%s", cur_topic->local_prefix, cur_topic->topic);
|
||||
cur_topic->local_topic[len] = '\0';
|
||||
}else{
|
||||
cur_topic->local_topic = mosquitto__strdup(cur_topic->local_prefix);
|
||||
if(!cur_topic->local_topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
}else{
|
||||
cur_topic->local_topic = mosquitto__strdup(cur_topic->topic);
|
||||
if(!cur_topic->local_topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
|
||||
if(cur_topic->remote_prefix){
|
||||
if(cur_topic->topic){
|
||||
len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->remote_prefix)+1;
|
||||
cur_topic->remote_topic = mosquitto__malloc((size_t)len+1);
|
||||
if(!cur_topic->remote_topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
snprintf(cur_topic->remote_topic, (size_t)len, "%s%s", cur_topic->remote_prefix, cur_topic->topic);
|
||||
cur_topic->remote_topic[len] = '\0';
|
||||
}else{
|
||||
cur_topic->remote_topic = mosquitto__strdup(cur_topic->remote_prefix);
|
||||
if(!cur_topic->remote_topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
}else{
|
||||
cur_topic->remote_topic = mosquitto__strdup(cur_topic->topic);
|
||||
if(!cur_topic->remote_topic){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
buf = strtok(NULL, "\n");
|
||||
}
|
||||
|
||||
if(nb_param>=3){
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}else{
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
}
|
||||
|
||||
int bridge__dynamic_parse_payload_del_json(void* payload, struct mosquitto_db *db, int *index)
|
||||
{
|
||||
int i;
|
||||
|
||||
#ifndef WITH_CJSON
|
||||
return bridge__dynamic_parse_payload_del(payload,db,index);
|
||||
#endif
|
||||
|
||||
cJSON *message_json = cJSON_Parse(payload);
|
||||
if(message_json == NULL){
|
||||
const char *error_ptr = cJSON_GetErrorPtr();
|
||||
if(error_ptr != NULL){
|
||||
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to parse JSON Message for bridge dynamic. Maybe normal message configuration. %s", error_ptr);
|
||||
}
|
||||
cJSON_Delete(message_json);
|
||||
return bridge__dynamic_parse_payload_del(payload,db,index);
|
||||
}
|
||||
|
||||
const cJSON *connection_json = NULL;
|
||||
connection_json = cJSON_GetObjectItemCaseSensitive(message_json, "connection");
|
||||
if(cJSON_IsString(connection_json) && (connection_json->valuestring != NULL)) {
|
||||
/* Check for existing bridge name. */
|
||||
for(i=0; i<db->bridge_count; i++){
|
||||
if(!strcmp(db->bridges[i]->bridge->name, connection_json->valuestring)){
|
||||
*index = i;
|
||||
}
|
||||
}
|
||||
}else{
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
int bridge__dynamic_parse_payload_del(void* payload, struct mosquitto_db *db, int *index)
|
||||
{
|
||||
char *buf;
|
||||
char *token;
|
||||
char *saveptr = NULL;
|
||||
int i;
|
||||
|
||||
buf = strdup(payload);
|
||||
if(buf[0] != '#' && buf[0] != 10 && buf[0] != 13){
|
||||
while(buf[strlen(buf)-1] == 10 || buf[strlen(buf)-1] == 13){
|
||||
buf[strlen(buf)-1] = 0;
|
||||
}
|
||||
token = strtok_r(buf, " ", &saveptr);
|
||||
|
||||
if(token)
|
||||
{
|
||||
if(!strcmp(token, "connection"))
|
||||
{
|
||||
//if(reload) continue; // FIXME
|
||||
token = strtok_r(NULL, " ", &saveptr);
|
||||
if(token){
|
||||
/* Check for existing bridge name. */
|
||||
for(i=0; i<db->bridge_count; i++){
|
||||
if(!strcmp(db->bridges[i]->bridge->name, token)){
|
||||
*index = i;
|
||||
}
|
||||
}
|
||||
}else{
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
static int config__check(struct mosquitto__config *config)
|
||||
{
|
||||
/* Checks that are easy to make after the config has been loaded. */
|
||||
|
||||
#ifdef WITH_BRIDGE
|
||||
int i, j;
|
||||
struct mosquitto__bridge *bridge1, *bridge2;
|
||||
char hostname[256];
|
||||
int len;
|
||||
|
||||
/* Check for bridge duplicate local_clientid, need to generate missing IDs
|
||||
* first. */
|
||||
for(i=0; i<config->bridge_count; i++){
|
||||
bridge1 = &config->bridges[i];
|
||||
|
||||
if(!bridge1->remote_clientid){
|
||||
if(!gethostname(hostname, 256)){
|
||||
len = (int)strlen(hostname) + (int)strlen(bridge1->name) + 2;
|
||||
bridge1->remote_clientid = mosquitto__malloc((size_t)len);
|
||||
if(!bridge1->remote_clientid){
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
snprintf(bridge1->remote_clientid, (size_t)len, "%s.%s", hostname, bridge1->name);
|
||||
}else{
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
if(!bridge1->local_clientid){
|
||||
len = (int)strlen(bridge1->remote_clientid) + (int)strlen("local.") + 2;
|
||||
bridge1->local_clientid = mosquitto__malloc((size_t)len);
|
||||
if(!bridge1->local_clientid){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
snprintf(bridge1->local_clientid, (size_t)len, "local.%s", bridge1->remote_clientid);
|
||||
}
|
||||
}
|
||||
|
||||
for(i=0; i<config->bridge_count; i++){
|
||||
bridge1 = &config->bridges[i];
|
||||
for(j=i+1; j<config->bridge_count; j++){
|
||||
bridge2 = &config->bridges[j];
|
||||
if(!strcmp(bridge1->local_clientid, bridge2->local_clientid)){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Bridge local_clientid "
|
||||
"'%s' is not unique. Try changing or setting the "
|
||||
"local_clientid value for one of the bridges.",
|
||||
bridge1->local_clientid);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
Loading…
Reference in New Issue