From e5237ae7e5c7adeaed821cf85a9354c3dae204bd Mon Sep 17 00:00:00 2001 From: Roger Light Date: Tue, 15 Oct 2019 14:26:51 +0100 Subject: [PATCH] Add support for sub/rr v5 prop output in JSON mode --- ChangeLog.txt | 2 + client/Makefile | 2 +- client/rr_client.c | 5 +- client/sub_client.c | 5 +- client/sub_client_output.c | 121 ++++++++++++++++++++++++++++++++++--- client/sub_client_output.h | 25 ++++++++ 6 files changed, 144 insertions(+), 16 deletions(-) create mode 100644 client/sub_client_output.h diff --git a/ChangeLog.txt b/ChangeLog.txt index 4595abf2..fb4c258a 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -22,6 +22,8 @@ Clients: - Add support for connecting to brokers through Unix domain sockets with the `--unix` argument. - Use cJSON library for producing JSON output, where available. Closes #1222. +- Add support for outputting MQTT v5 property information to mosquitto_sub/rr + JSON output. 1.6.7 - 20190925 diff --git a/client/Makefile b/client/Makefile index 54bb9edd..5f0f4d18 100644 --- a/client/Makefile +++ b/client/Makefile @@ -47,7 +47,7 @@ pub_shared.o : pub_shared.c ${SHARED_DEP} sub_client.o : sub_client.c ${SHARED_DEP} ${CROSS_COMPILE}${CC} $(CLIENT_CPPFLAGS) $(CLIENT_CFLAGS) -c $< -o $@ -sub_client_output.o : sub_client_output.c ${SHARED_DEP} +sub_client_output.o : sub_client_output.c sub_client_output.h ${SHARED_DEP} ${CROSS_COMPILE}${CC} $(CLIENT_CPPFLAGS) $(CLIENT_CFLAGS) -c $< -o $@ rr_client.o : rr_client.c ${SHARED_DEP} diff --git a/client/rr_client.c b/client/rr_client.c index aa8ef5e6..cd8282d5 100644 --- a/client/rr_client.c +++ b/client/rr_client.c @@ -35,6 +35,7 @@ Contributors: #include #include "client_shared.h" #include "pub_shared.h" +#include "sub_client_output.h" enum rr__state { rr_s_new, @@ -64,8 +65,6 @@ void my_signal_handler(int signum) } #endif -void print_message(struct mosq_config *cfg, const struct mosquitto_message *message); - int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, void *payload, int qos, bool retain) { @@ -75,7 +74,7 @@ int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadl void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message, const mosquitto_property *properties) { - print_message(&cfg, message); + print_message(&cfg, message, properties); switch(cfg.pub_mode){ case MSGMODE_CMD: case MSGMODE_FILE: diff --git a/client/sub_client.c b/client/sub_client.c index 6b2f9bd4..4e8f0f44 100644 --- a/client/sub_client.c +++ b/client/sub_client.c @@ -34,6 +34,7 @@ Contributors: #include #include #include "client_shared.h" +#include "sub_client_output.h" struct mosq_config cfg; bool process_messages = true; @@ -53,8 +54,6 @@ void my_signal_handler(int signum) } #endif -void print_message(struct mosq_config *cfg, const struct mosquitto_message *message); - void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties) { @@ -98,7 +97,7 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit } } - print_message(&cfg, message); + print_message(&cfg, message, properties); if(cfg.msg_count>0){ msg_count++; diff --git a/client/sub_client_output.c b/client/sub_client_output.c index 9eb1c6de..41f1cd65 100644 --- a/client/sub_client_output.c +++ b/client/sub_client_output.c @@ -39,7 +39,9 @@ Contributors: #endif #include +#include #include "client_shared.h" +#include "sub_client_output.h" extern struct mosq_config cfg; @@ -116,7 +118,97 @@ static void write_json_payload(const char *payload, int payloadlen) #endif -static int json_print(const struct mosquitto_message *message, const struct tm *ti, bool escaped) +#ifdef WITH_CJSON +static int json_print_properties(cJSON *root, const mosquitto_property *properties) +{ + int identifier; + uint8_t i8value; + uint16_t i16value; + uint32_t i32value; + char *strname, *strvalue; + char *binvalue; + cJSON *tmp, *prop_json, *user_json = NULL; + const mosquitto_property *prop; + + prop_json = cJSON_CreateObject(); + if(prop_json == NULL){ + cJSON_Delete(prop_json); + return MOSQ_ERR_NOMEM; + } + cJSON_AddItemToObject(root, "properties", prop_json); + + for(prop=properties; prop != NULL; prop = mosquitto_property_next(prop)){ + tmp = NULL; + identifier = mosquitto_property_identifier(prop); + switch(identifier){ + case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR: + mosquitto_property_read_byte(prop, MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, &i8value, false); + tmp = cJSON_CreateNumber(i8value); + break; + + case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL: + mosquitto_property_read_int32(prop, MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, &i32value, false); + tmp = cJSON_CreateNumber(i32value); + break; + + case MQTT_PROP_CONTENT_TYPE: + case MQTT_PROP_RESPONSE_TOPIC: + mosquitto_property_read_string(prop, identifier, &strvalue, false); + if(strvalue == NULL) return MOSQ_ERR_NOMEM; + tmp = cJSON_CreateString(strvalue); + free(strvalue); + break; + + case MQTT_PROP_CORRELATION_DATA: + mosquitto_property_read_binary(prop, MQTT_PROP_CORRELATION_DATA, (void **)&binvalue, &i16value, false); + if(binvalue == NULL) return MOSQ_ERR_NOMEM; + tmp = cJSON_CreateString(binvalue); + free(binvalue); + break; + + case MQTT_PROP_SUBSCRIPTION_IDENTIFIER: + mosquitto_property_read_varint(prop, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, &i32value, false); + tmp = cJSON_CreateNumber(i32value); + break; + + case MQTT_PROP_TOPIC_ALIAS: + mosquitto_property_read_int16(prop, MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, &i16value, false); + tmp = cJSON_CreateNumber(i16value); + break; + + case MQTT_PROP_USER_PROPERTY: + if(user_json == NULL){ + user_json = cJSON_CreateObject(); + if(user_json == NULL){ + return MOSQ_ERR_NOMEM; + } + cJSON_AddItemToObject(prop_json, "user-properties", user_json); + } + mosquitto_property_read_string_pair(prop, MQTT_PROP_USER_PROPERTY, &strname, &strvalue, false); + if(strname == NULL || strvalue == NULL) return MOSQ_ERR_NOMEM; + + tmp = cJSON_CreateString(strvalue); + free(strvalue); + + if(tmp == NULL){ + free(strname); + return MOSQ_ERR_NOMEM; + } + cJSON_AddItemToObject(user_json, strname, tmp); + free(strname); + tmp = NULL; /* Don't add this to prop_json below */ + break; + } + if(tmp != NULL){ + cJSON_AddItemToObject(prop_json, mosquitto_property_identifier_to_string(identifier), tmp); + } + } + return MOSQ_ERR_SUCCESS; +} +#endif + + +static int json_print(const struct mosquitto_message *message, const mosquitto_property *properties, const struct tm *ti, bool escaped) { #ifdef WITH_CJSON cJSON *root; @@ -136,7 +228,7 @@ static int json_print(const struct mosquitto_message *message, const struct tm * } cJSON_AddItemToObject(root, "tst", tmp); - tmp = cJSON_CreateStringReference(message->topic); + tmp = cJSON_CreateString(message->topic); if(tmp == NULL){ cJSON_Delete(root); return MOSQ_ERR_NOMEM; @@ -173,8 +265,18 @@ static int json_print(const struct mosquitto_message *message, const struct tm * } cJSON_AddItemToObject(root, "mid", tmp); } + + /* Properties */ + if(properties){ + if(json_print_properties(root, properties)){ + cJSON_Delete(root); + return MOSQ_ERR_NOMEM; + } + } + + /* Payload */ if(escaped){ - tmp = cJSON_CreateStringReference(message->payload); + tmp = cJSON_CreateString(message->payload); if(tmp == NULL){ cJSON_Delete(root); return MOSQ_ERR_NOMEM; @@ -190,7 +292,8 @@ static int json_print(const struct mosquitto_message *message, const struct tm * cJSON_AddItemToObject(root, "payload", tmp); } - json_str = cJSON_PrintUnformatted(root); + //json_str = cJSON_PrintUnformatted(root); + json_str = cJSON_Print(root); cJSON_Delete(root); fputs(json_str, stdout); @@ -220,7 +323,7 @@ static int json_print(const struct mosquitto_message *message, const struct tm * } -static void formatted_print(const struct mosq_config *lcfg, const struct mosquitto_message *message) +static void formatted_print(const struct mosq_config *lcfg, const struct mosquitto_message *message, const mosquitto_property *properties) { int len; int i; @@ -260,7 +363,7 @@ static void formatted_print(const struct mosq_config *lcfg, const struct mosquit return; } } - if(json_print(message, ti, true) != MOSQ_ERR_SUCCESS){ + if(json_print(message, properties, ti, true) != MOSQ_ERR_SUCCESS){ err_printf(lcfg, "Error: Out of memory.\n"); return; } @@ -273,7 +376,7 @@ static void formatted_print(const struct mosq_config *lcfg, const struct mosquit return; } } - rc = json_print(message, ti, false); + rc = json_print(message, properties, ti, false); if(rc == MOSQ_ERR_NOMEM){ err_printf(lcfg, "Error: Out of memory.\n"); return; @@ -406,10 +509,10 @@ static void formatted_print(const struct mosq_config *lcfg, const struct mosquit } -void print_message(struct mosq_config *cfg, const struct mosquitto_message *message) +void print_message(struct mosq_config *cfg, const struct mosquitto_message *message, const mosquitto_property *properties) { if(cfg->format){ - formatted_print(cfg, message); + formatted_print(cfg, message, properties); }else if(cfg->verbose){ if(message->payloadlen){ printf("%s ", message->topic); diff --git a/client/sub_client_output.h b/client/sub_client_output.h new file mode 100644 index 00000000..cd8c75fe --- /dev/null +++ b/client/sub_client_output.h @@ -0,0 +1,25 @@ +/* +Copyright (c) 2019 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#ifndef SUB_CLIENT_OUTPUT_H +#define SUB_CLIENT_OUTPUT_H + +#include "mosquitto.h" +#include "client_shared.h" + +void print_message(struct mosq_config *cfg, const struct mosquitto_message *message, const mosquitto_property *properties); + +#endif