diff --git a/ChangeLog.txt b/ChangeLog.txt
index 215ca204..6d22cf5c 100644
--- a/ChangeLog.txt
+++ b/ChangeLog.txt
@@ -17,6 +17,12 @@ Security:
is not a useful configuration, this behaviour is unexpected and could lead
to access being incorrectly granted in some circumstances. This is now
fixed. Affects versions 1.0 to 1.5.5 inclusive.
+- Fix CVE-2018-12546. If a client publishes a retained message to a topic that
+ they have access to, and then their access to that topic is revoked, the
+ retained message will still be delivered to future subscribers. This
+ behaviour may be undesirable in some applications, so a configuration option
+ `check_retain_source` has been introduced to enforce checking of the
+ retained message source on publish.
Broker:
- Fixed comment handling for config options that have optional arguments.
diff --git a/man/mosquitto.conf.5.xml b/man/mosquitto.conf.5.xml
index 8b212e2d..d941c985 100644
--- a/man/mosquitto.conf.5.xml
+++ b/man/mosquitto.conf.5.xml
@@ -294,6 +294,24 @@
Reloaded on reload signal.
+
+ [ true | false ]
+
+ This option affects the scenario when a client
+ subscribes to a topic that has retained messages. It is
+ possible that the client that published the retained
+ message to the topic had access at the time they
+ published, but that access has been subsequently
+ removed. If is set
+ to true, the default, the source of a retained message
+ will be checked for access rights before it is
+ republished. When set to false, no check will be made
+ and the retained message will always be
+ published.
+ This option applies globally, regardless of the
+ option.
+
+
prefix
diff --git a/mosquitto.conf b/mosquitto.conf
index 96450050..e8c43397 100644
--- a/mosquitto.conf
+++ b/mosquitto.conf
@@ -148,6 +148,15 @@
# setting behaviour from previous versions of mosquitto.
#per_listener_settings false
+# This option affects the scenario when a client subscribes to a topic that has
+# retained messages. It is possible that the client that published the retained
+# message to the topic had access at the time they published, but that access
+# has been subsequently removed. If check_retain_source is set to true, the
+# default, the source of a retained message will be checked for access rights
+# before it is republished. When set to false, no check will be made and the
+# retained message will always be published. This affects all listeners.
+#check_retain_source true
+
# =================================================================
# Default listener
diff --git a/src/conf.c b/src/conf.c
index c995babe..f81219f3 100644
--- a/src/conf.c
+++ b/src/conf.c
@@ -1093,6 +1093,9 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: TLS support not available.");
#endif
+ }else if(!strcmp(token, "check_retain_source")){
+ conf__set_cur_security_options(config, cur_listener, &cur_security_options);
+ if(conf__parse_bool(&token, "check_retain_source", &config->check_retain_source, saveptr)) return MOSQ_ERR_INVAL;
}else if(!strcmp(token, "ciphers")){
#ifdef WITH_TLS
if(reload) continue; // Listeners not valid for reloading.
diff --git a/src/database.c b/src/database.c
index 2f76de12..8253cb41 100644
--- a/src/database.c
+++ b/src/database.c
@@ -200,6 +200,7 @@ void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *s
db->msg_store_bytes -= store->payloadlen;
mosquitto__free(store->source_id);
+ mosquitto__free(store->source_username);
if(store->dest_ids){
for(i=0; idest_id_count; i++){
mosquitto__free(store->dest_ids[i]);
@@ -587,18 +588,19 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
}
memcpy(UHPA_ACCESS(payload_uhpa, payloadlen), payload, payloadlen);
+ if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, 0)) return 1;
+
if(context && context->id){
source_id = context->id;
}else{
source_id = "";
}
- if(db__message_store(db, source_id, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, 0)) return 1;
return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored);
}
/* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload. */
-int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id)
+int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id)
{
struct mosquitto_msg_store *temp = NULL;
int rc = MOSQ_ERR_SUCCESS;
@@ -606,7 +608,7 @@ int db__message_store(struct mosquitto_db *db, const char *source, uint16_t sour
assert(db);
assert(stored);
- temp = mosquitto__malloc(sizeof(struct mosquitto_msg_store));
+ temp = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
if(!temp){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
rc = MOSQ_ERR_NOMEM;
@@ -617,8 +619,8 @@ int db__message_store(struct mosquitto_db *db, const char *source, uint16_t sour
temp->payload.ptr = NULL;
temp->ref_count = 0;
- if(source){
- temp->source_id = mosquitto__strdup(source);
+ if(source && source->id){
+ temp->source_id = mosquitto__strdup(source->id);
}else{
temp->source_id = mosquitto__strdup("");
}
@@ -627,6 +629,17 @@ int db__message_store(struct mosquitto_db *db, const char *source, uint16_t sour
rc = MOSQ_ERR_NOMEM;
goto error;
}
+
+ if(source && source->username){
+ temp->source_username = mosquitto__strdup(source->username);
+ if(!temp->source_username){
+ rc = MOSQ_ERR_NOMEM;
+ goto error;
+ }
+ }
+ if(source){
+ temp->source_listener = source->listener;
+ }
temp->source_mid = source_mid;
temp->mid = 0;
temp->qos = qos;
@@ -659,6 +672,7 @@ error:
mosquitto__free(topic);
if(temp){
mosquitto__free(temp->source_id);
+ mosquitto__free(temp->source_username);
mosquitto__free(temp->topic);
mosquitto__free(temp);
}
diff --git a/src/db_dump/Makefile b/src/db_dump/Makefile
index 13ae261b..202af870 100644
--- a/src/db_dump/Makefile
+++ b/src/db_dump/Makefile
@@ -1,6 +1,6 @@
include ../../config.mk
-CFLAGS_FINAL=${CFLAGS} -I.. -I../../lib -I../..
+CFLAGS_FINAL=${CFLAGS} -I.. -I../../lib -I../.. -I../deps
.PHONY: all clean reallyclean
diff --git a/src/db_dump/db_dump.c b/src/db_dump/db_dump.c
index e009cce5..62bf24be 100644
--- a/src/db_dump/db_dump.c
+++ b/src/db_dump/db_dump.c
@@ -83,7 +83,9 @@ struct db_msg
uint8_t qos, retain;
uint8_t *payload;
char *source_id;
+ char *source_username;
char *topic;
+ uint16_t source_port;
};
static uint32_t db_version;
@@ -177,6 +179,8 @@ print_db_msg(struct db_msg *msg, int length)
printf("\tLength: %d\n", length);
printf("\tStore ID: %" PRIu64 "\n", msg->store_id);
printf("\tSource ID: %s\n", msg->source_id);
+ printf("\tSource Username: %s\n", msg->source_username);
+ printf("\tSource Port: %d\n", msg->source_port);
printf("\tSource MID: %d\n", msg->source_mid);
printf("\tMID: %d\n", msg->mid);
printf("\tTopic: %s\n", msg->topic);
@@ -194,26 +198,49 @@ print_db_msg(struct db_msg *msg, int length)
}
+static int persist__read_string(FILE *db_fptr, char **str)
+{
+ uint16_t i16temp;
+ uint16_t slen;
+ char *s = NULL;
+
+ if(fread(&i16temp, 1, sizeof(uint16_t), db_fptr) != sizeof(uint16_t)){
+ return MOSQ_ERR_INVAL;
+ }
+
+ slen = ntohs(i16temp);
+ if(slen){
+ s = mosquitto__malloc(slen+1);
+ if(!s){
+ fclose(db_fptr);
+ fprintf(stderr, "Error: Out of memory.\n");
+ return MOSQ_ERR_NOMEM;
+ }
+ if(fread(s, 1, slen, db_fptr) != slen){
+ mosquitto__free(s);
+ fprintf(stderr, "Error: %s.\n", strerror(errno));
+ return MOSQ_ERR_INVAL;
+ }
+ s[slen] = '\0';
+ }
+
+ *str = s;
+ return MOSQ_ERR_SUCCESS;
+}
+
+
static int db__client_chunk_restore(struct mosquitto_db *db, FILE *db_fd, struct db_client *client)
{
- uint16_t i16temp, slen;
+ uint16_t i16temp;
int rc = 0;
struct client_chunk *cc;
- read_e(db_fd, &i16temp, sizeof(uint16_t));
- slen = ntohs(i16temp);
- if(!slen){
+ rc = persist__read_string(db_fd, &client->client_id);
+ if(rc){
fprintf(stderr, "Error: Corrupt persistent database.");
fclose(db_fd);
- return 1;
+ return rc;
}
- client->client_id = calloc(slen+1, sizeof(char));
- if(!client->client_id){
- fclose(db_fd);
- fprintf(stderr, "Error: Out of memory.");
- return 1;
- }
- read_e(db_fd, client->client_id, slen);
read_e(db_fd, &i16temp, sizeof(uint16_t));
client->last_mid = ntohs(i16temp);
@@ -245,24 +272,17 @@ error:
static int db__client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length, struct db_client_msg *msg)
{
dbid_t i64temp;
- uint16_t i16temp, slen;
+ uint16_t i16temp;
struct client_chunk *cc;
struct msg_store_chunk *msc;
+ int rc;
- read_e(db_fd, &i16temp, sizeof(uint16_t));
- slen = ntohs(i16temp);
- if(!slen){
+ rc = persist__read_string(db_fd, &msg->client_id);
+ if(rc){
fprintf(stderr, "Error: Corrupt persistent database.");
fclose(db_fd);
- return 1;
+ return rc;
}
- msg->client_id = calloc(slen+1, sizeof(char));
- if(!msg->client_id){
- fclose(db_fd);
- fprintf(stderr, "Error: Out of memory.");
- return 1;
- }
- read_e(db_fd, msg->client_id, slen);
read_e(db_fd, &i64temp, sizeof(dbid_t));
msg->store_id = i64temp;
@@ -301,58 +321,48 @@ static int db__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uin
{
dbid_t i64temp;
uint32_t i32temp;
- uint16_t i16temp, slen;
+ uint16_t i16temp;
int rc = 0;
struct msg_store_chunk *mcs;
read_e(db_fd, &i64temp, sizeof(dbid_t));
msg->store_id = i64temp;
- read_e(db_fd, &i16temp, sizeof(uint16_t));
- slen = ntohs(i16temp);
- if(slen){
- msg->source_id = calloc(slen+1, sizeof(char));
- if(!msg->source_id){
- fclose(db_fd);
- fprintf(stderr, "Error: Out of memory.");
- return 1;
- }
- if(fread(msg->source_id, 1, slen, db_fd) != slen){
+ rc = persist__read_string(db_fd, &msg->source_id);
+ if(rc){
+ fprintf(stderr, "Error: Corrupt persistent database.");
+ fclose(db_fd);
+ return rc;
+ }
+ if(db_version == 4){
+ rc = persist__read_string(db_fd, &msg->source_username);
+ if(rc){
fprintf(stderr, "Error: %s.", strerror(errno));
fclose(db_fd);
free(msg->source_id);
+ free(msg->topic);
+ free(msg->payload);
+ free(msg->source_id);
return 1;
}
+ read_e(db_fd, &i16temp, sizeof(uint16_t));
+ msg->source_port = ntohs(i16temp);
}
+
+
read_e(db_fd, &i16temp, sizeof(uint16_t));
msg->source_mid = ntohs(i16temp);
read_e(db_fd, &i16temp, sizeof(uint16_t));
msg->mid = ntohs(i16temp);
- read_e(db_fd, &i16temp, sizeof(uint16_t));
- slen = ntohs(i16temp);
- if(slen){
- msg->topic = calloc(slen+1, sizeof(char));
- if(!msg->topic){
- fclose(db_fd);
- free(msg->source_id);
- fprintf(stderr, "Error: Out of memory.");
- return 1;
- }
- if(fread(msg->topic, 1, slen, db_fd) != slen){
- fprintf(stderr, "Error: %s.", strerror(errno));
- fclose(db_fd);
- free(msg->source_id);
- free(msg->topic);
- return 1;
- }
- }else{
- fprintf(stderr, "Error: Invalid msg_store chunk when restoring persistent database.");
+ rc = persist__read_string(db_fd, &msg->topic);
+ if(rc){
+ fprintf(stderr, "Error: Corrupt persistent database.");
fclose(db_fd);
- free(msg->source_id);
- return 1;
+ return rc;
}
+
read_e(db_fd, &msg->qos, sizeof(uint8_t));
read_e(db_fd, &msg->retain, sizeof(uint8_t));
@@ -415,29 +425,23 @@ static int db__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fd)
static int db__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length, struct db_sub *sub)
{
- uint16_t i16temp, slen;
int rc = 0;
struct client_chunk *cc;
- read_e(db_fd, &i16temp, sizeof(uint16_t));
- slen = ntohs(i16temp);
- sub->client_id = calloc(slen+1, sizeof(char));
- if(!sub->client_id){
+ rc = persist__read_string(db_fd, &sub->client_id);
+ if(rc){
+ fprintf(stderr, "Error: Corrupt persistent database.");
fclose(db_fd);
- fprintf(stderr, "Error: Out of memory.");
- return 1;
+ return rc;
}
- read_e(db_fd, sub->client_id, slen);
- read_e(db_fd, &i16temp, sizeof(uint16_t));
- slen = ntohs(i16temp);
- sub->topic = calloc(slen+1, sizeof(char));
- if(!sub->topic){
+
+ rc = persist__read_string(db_fd, &sub->topic);
+ if(rc){
+ fprintf(stderr, "Error: Corrupt persistent database.");
fclose(db_fd);
- fprintf(stderr, "Error: Out of memory.");
- free(sub->client_id);
- return 1;
+ return rc;
}
- read_e(db_fd, sub->topic, slen);
+
read_e(db_fd, &sub->qos, sizeof(uint8_t));
if(client_stats){
diff --git a/src/handle_connect.c b/src/handle_connect.c
index b9b0fefd..f47edb3b 100644
--- a/src/handle_connect.c
+++ b/src/handle_connect.c
@@ -123,13 +123,11 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
uint8_t username_flag, password_flag;
char *username = NULL, *password = NULL;
int rc;
- struct mosquitto__acl_user *acl_tail;
struct mosquitto *found_context;
int slen;
uint16_t slen16;
struct mosquitto__subleaf *leaf;
int i;
- struct mosquitto__security_options *security_opts;
#ifdef WITH_TLS
X509 *client_cert = NULL;
X509_NAME *name;
@@ -613,36 +611,8 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
do_disconnect(db, found_context);
}
- /* Associate user with its ACL, assuming we have ACLs loaded. */
- if(db->config->per_listener_settings){
- if(!context->listener){
- return 1;
- }
- security_opts = &context->listener->security_options;
- }else{
- security_opts = &db->config->security_options;
- }
-
- if(security_opts->acl_list){
- acl_tail = security_opts->acl_list;
- while(acl_tail){
- if(context->username){
- if(acl_tail->username && !strcmp(context->username, acl_tail->username)){
- context->acl_list = acl_tail;
- break;
- }
- }else{
- if(acl_tail->username == NULL){
- context->acl_list = acl_tail;
- break;
- }
- }
- acl_tail = acl_tail->next;
- }
- }else{
- context->acl_list = NULL;
- }
-
+ rc = acl__find_acls(db, context);
+ if(rc) return rc;
if(will_struct){
context->will = will_struct;
diff --git a/src/handle_publish.c b/src/handle_publish.c
index 54976afc..76b3ee89 100644
--- a/src/handle_publish.c
+++ b/src/handle_publish.c
@@ -184,7 +184,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
}
if(!stored){
dup = 0;
- if(db__message_store(db, context->id, mid, topic, qos, payloadlen, &payload, retain, &stored, 0)){
+ if(db__message_store(db, context, mid, topic, qos, payloadlen, &payload, retain, &stored, 0)){
return 1;
}
}else{
@@ -229,7 +229,7 @@ process_bad_message:
case 2:
db__message_store_find(context, mid, &stored);
if(!stored){
- if(db__message_store(db, context->id, mid, NULL, qos, 0, NULL, false, &stored, 0)){
+ if(db__message_store(db, context, mid, NULL, qos, 0, NULL, false, &stored, 0)){
return 1;
}
res = db__message_insert(db, context, mid, mosq_md_in, qos, false, stored);
diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h
index 0b69a8cc..c3655981 100644
--- a/src/mosquitto_broker_internal.h
+++ b/src/mosquitto_broker_internal.h
@@ -248,6 +248,7 @@ struct mosquitto__config {
bool allow_duplicate_messages;
int autosave_interval;
bool autosave_on_changes;
+ bool check_retain_source;
char *clientid_prefixes;
bool connection_messages;
bool daemon;
@@ -312,6 +313,8 @@ struct mosquitto_msg_store{
struct mosquitto_msg_store *prev;
dbid_t db_id;
char *source_id;
+ char *source_username;
+ struct mosquitto__listener *source_listener;
char **dest_ids;
int dest_id_count;
int ref_count;
@@ -553,7 +556,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context);
void db__message_dequeue_first(struct mosquitto *context);
int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context);
int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain);
-int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id);
+int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id);
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
void db__msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store);
void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store);
@@ -607,6 +610,7 @@ void bridge__packet_cleanup(struct mosquitto *context);
/* ============================================================
* Security related functions
* ============================================================ */
+int acl__find_acls(struct mosquitto_db *db, struct mosquitto *context);
int mosquitto_security_module_init(struct mosquitto_db *db);
int mosquitto_security_module_cleanup(struct mosquitto_db *db);
diff --git a/src/persist.c b/src/persist.c
index 3299356d..2f400869 100644
--- a/src/persist.c
+++ b/src/persist.c
@@ -39,6 +39,8 @@ static uint32_t db_version;
static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, const char *sub, int qos);
+static int persist__read_string(FILE *db_fptr, char **str);
+static int persist__write_string(FILE *db_fptr, const char *str, bool nullok);
static struct mosquitto *persist__find_or_add_context(struct mosquitto_db *db, const char *client_id, uint16_t last_mid)
{
@@ -139,7 +141,7 @@ static int persist__message_store_write(struct mosquitto_db *db, FILE *db_fptr)
uint32_t length;
dbid_t i64temp;
uint32_t i32temp;
- uint16_t i16temp, slen, tlen;
+ uint16_t i16temp, tlen;
uint8_t i8temp;
struct mosquitto_msg_store *stored;
bool force_no_retain;
@@ -168,10 +170,19 @@ static int persist__message_store_write(struct mosquitto_db *db, FILE *db_fptr)
}else{
tlen = 0;
}
- length = htonl(sizeof(dbid_t) + 2+strlen(stored->source_id) +
+ length = sizeof(dbid_t) + 2+strlen(stored->source_id) +
sizeof(uint16_t) + sizeof(uint16_t) +
2+tlen + sizeof(uint32_t) +
- stored->payloadlen + sizeof(uint8_t) + sizeof(uint8_t));
+ stored->payloadlen + sizeof(uint8_t) + sizeof(uint8_t)
+ + 2*sizeof(uint16_t);
+
+ if(stored->source_id){
+ length += strlen(stored->source_id);
+ }
+ if(stored->source_username){
+ length += strlen(stored->source_username);
+ }
+ length = htonl(length);
i16temp = htons(DB_CHUNK_MSG_STORE);
write_e(db_fptr, &i16temp, sizeof(uint16_t));
@@ -180,12 +191,15 @@ static int persist__message_store_write(struct mosquitto_db *db, FILE *db_fptr)
i64temp = stored->db_id;
write_e(db_fptr, &i64temp, sizeof(dbid_t));
- slen = strlen(stored->source_id);
- i16temp = htons(slen);
- write_e(db_fptr, &i16temp, sizeof(uint16_t));
- if(slen){
- write_e(db_fptr, stored->source_id, slen);
+ if(persist__write_string(db_fptr, stored->source_id, false)) return 1;
+ if(persist__write_string(db_fptr, stored->source_username, true)) return 1;
+ if(stored->source_listener){
+ i16temp = htons(stored->source_listener->port);
+ }else{
+ i16temp = 0;
}
+ write_e(db_fptr, &i16temp, sizeof(uint16_t));
+
i16temp = htons(stored->source_mid);
write_e(db_fptr, &i16temp, sizeof(uint16_t));
@@ -214,6 +228,7 @@ static int persist__message_store_write(struct mosquitto_db *db, FILE *db_fptr)
if(stored->payloadlen){
write_e(db_fptr, UHPA_ACCESS_PAYLOAD(stored), (unsigned int)stored->payloadlen);
}
+
stored = stored->next;
}
@@ -265,6 +280,60 @@ error:
return 1;
}
+
+static int persist__read_string(FILE *db_fptr, char **str)
+{
+ uint16_t i16temp;
+ uint16_t slen;
+ char *s = NULL;
+
+ if(fread(&i16temp, 1, sizeof(uint16_t), db_fptr) != sizeof(uint16_t)){
+ return MOSQ_ERR_INVAL;
+ }
+
+ slen = ntohs(i16temp);
+ if(slen){
+ s = mosquitto__malloc(slen+1);
+ if(!s){
+ fclose(db_fptr);
+ log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
+ return MOSQ_ERR_NOMEM;
+ }
+ if(fread(s, 1, slen, db_fptr) != slen){
+ mosquitto__free(s);
+ return MOSQ_ERR_NOMEM;
+ }
+ s[slen] = '\0';
+ }
+
+ *str = s;
+ return MOSQ_ERR_SUCCESS;
+}
+
+
+static int persist__write_string(FILE *db_fptr, const char *str, bool nullok)
+{
+ uint16_t i16temp, slen;
+
+ if(str){
+ slen = strlen(str);
+ i16temp = htons(slen);
+ write_e(db_fptr, &i16temp, sizeof(uint16_t));
+ write_e(db_fptr, str, slen);
+ }else if(nullok){
+ i16temp = htons(0);
+ write_e(db_fptr, &i16temp, sizeof(uint16_t));
+ }else{
+ return 1;
+ }
+
+ return MOSQ_ERR_SUCCESS;
+error:
+ log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
+ return 1;
+}
+
+
static int persist__subs_retain_write(struct mosquitto_db *db, FILE *db_fptr, struct mosquitto__subhier *node, const char *topic, int level)
{
struct mosquitto__subhier *subhier, *subhier_tmp;
@@ -642,10 +711,10 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp
{
dbid_t i64temp, store_id;
uint32_t i32temp, payloadlen = 0;
- uint16_t i16temp, slen, source_mid;
+ uint16_t i16temp, source_mid, source_port = 0;
uint8_t qos, retain;
mosquitto__payload_uhpa payload;
- char *source_id = NULL;
+ struct mosquitto source;
char *topic = NULL;
int rc = 0;
struct mosquitto_msg_store *stored = NULL;
@@ -664,41 +733,45 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp
read_e(db_fptr, &i64temp, sizeof(dbid_t));
store_id = i64temp;
- read_e(db_fptr, &i16temp, sizeof(uint16_t));
- slen = ntohs(i16temp);
- if(slen){
- source_id = mosquitto__malloc(slen+1);
- if(!source_id){
+ memset(&source, 0, sizeof(struct mosquitto));
+
+ rc = persist__read_string(db_fptr, &source.id);
+ if(rc){
+ mosquitto__free(load);
+ return rc;
+ }
+ if(db_version == 4){
+ rc = persist__read_string(db_fptr, &source.username);
+ if(rc){
mosquitto__free(load);
- fclose(db_fptr);
- log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
- return MOSQ_ERR_NOMEM;
+ return rc;
+ }
+ read_e(db_fptr, &i16temp, sizeof(uint16_t));
+ source_port = ntohs(i16temp);
+ if(source_port){
+ for(int i=0; iconfig->listener_count; i++){
+ if(db->config->listeners[i].port == source_port){
+ source.listener = &db->config->listeners[i];
+ break;
+ }
+ }
}
- read_e(db_fptr, source_id, slen);
- source_id[slen] = '\0';
}
+
read_e(db_fptr, &i16temp, sizeof(uint16_t));
source_mid = ntohs(i16temp);
/* This is the mid - don't need it */
read_e(db_fptr, &i16temp, sizeof(uint16_t));
- read_e(db_fptr, &i16temp, sizeof(uint16_t));
- slen = ntohs(i16temp);
- if(slen){
- topic = mosquitto__malloc(slen+1);
- if(!topic){
- mosquitto__free(load);
- fclose(db_fptr);
- mosquitto__free(source_id);
- log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
- return MOSQ_ERR_NOMEM;
- }
- read_e(db_fptr, topic, slen);
- topic[slen] = '\0';
- }else{
- topic = NULL;
+ rc = persist__read_string(db_fptr, &topic);
+ if(rc){
+ mosquitto__free(load);
+ fclose(db_fptr);
+ mosquitto__free(source.id);
+ return rc;
}
+
read_e(db_fptr, &qos, sizeof(uint8_t));
read_e(db_fptr, &retain, sizeof(uint8_t));
@@ -709,7 +782,7 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp
if(UHPA_ALLOC(payload, payloadlen) == 0){
mosquitto__free(load);
fclose(db_fptr);
- mosquitto__free(source_id);
+ mosquitto__free(source.id);
mosquitto__free(topic);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
@@ -717,8 +790,8 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp
read_e(db_fptr, UHPA_ACCESS(payload, payloadlen), payloadlen);
}
- rc = db__message_store(db, source_id, source_mid, topic, qos, payloadlen, &payload, retain, &stored, store_id);
- mosquitto__free(source_id);
+ rc = db__message_store(db, &source, source_mid, topic, qos, payloadlen, &payload, retain, &stored, store_id);
+ mosquitto__free(source.id);
if(rc == MOSQ_ERR_SUCCESS){
load->db_id = stored->db_id;
@@ -737,7 +810,7 @@ error:
err = strerror(errno);
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err);
fclose(db_fptr);
- mosquitto__free(source_id);
+ mosquitto__free(source.id);
mosquitto__free(topic);
UHPA_FREE(payload, payloadlen);
return 1;
@@ -768,35 +841,24 @@ static int persist__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
static int persist__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
{
- uint16_t i16temp, slen;
uint8_t qos;
char *client_id;
char *topic;
int rc = 0;
char *err;
- read_e(db_fptr, &i16temp, sizeof(uint16_t));
- slen = ntohs(i16temp);
- client_id = mosquitto__malloc(slen+1);
- if(!client_id){
+ rc = persist__read_string(db_fptr, &client_id);
+ if(rc){
fclose(db_fptr);
- log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
- return MOSQ_ERR_NOMEM;
+ return rc;
}
- read_e(db_fptr, client_id, slen);
- client_id[slen] = '\0';
- read_e(db_fptr, &i16temp, sizeof(uint16_t));
- slen = ntohs(i16temp);
- topic = mosquitto__malloc(slen+1);
- if(!topic){
- fclose(db_fptr);
- log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
+ rc = persist__read_string(db_fptr, &topic);
+ if(rc){
mosquitto__free(client_id);
- return MOSQ_ERR_NOMEM;
+ fclose(db_fptr);
+ return rc;
}
- read_e(db_fptr, topic, slen);
- topic[slen] = '\0';
read_e(db_fptr, &qos, sizeof(uint8_t));
if(persist__restore_sub(db, client_id, topic, qos)){
@@ -852,7 +914,9 @@ int persist__restore(struct mosquitto_db *db)
* Is your DB change still compatible with previous versions?
*/
if(db_version > MOSQ_DB_VERSION && db_version != 0){
- if(db_version == 2){
+ if(db_version == 3){
+ /* Addition of source_username and source_port to msg_store chunk in v4, v1.5.6 */
+ }else if(db_version == 2){
/* Addition of disconnect_t to client chunk in v3. */
}else{
fclose(fptr);
diff --git a/src/persist.h b/src/persist.h
index 63a1a0cd..04f26342 100644
--- a/src/persist.h
+++ b/src/persist.h
@@ -17,7 +17,7 @@ Contributors:
#ifndef PERSIST_H
#define PERSIST_H
-#define MOSQ_DB_VERSION 3
+#define MOSQ_DB_VERSION 4
/* DB read/write */
const unsigned char magic[15] = {0x00, 0xB5, 0x00, 'm','o','s','q','u','i','t','t','o',' ','d','b'};
diff --git a/src/security_default.c b/src/security_default.c
index 5a886a53..44089d9b 100644
--- a/src/security_default.c
+++ b/src/security_default.c
@@ -610,6 +610,49 @@ static int acl__cleanup(struct mosquitto_db *db, bool reload)
return MOSQ_ERR_SUCCESS;
}
+
+int acl__find_acls(struct mosquitto_db *db, struct mosquitto *context)
+{
+ struct mosquitto__acl_user *acl_tail;
+ struct mosquitto__security_options *security_opts;
+
+ /* Associate user with its ACL, assuming we have ACLs loaded. */
+ if(db->config->per_listener_settings){
+ if(!context->listener){
+ return MOSQ_ERR_INVAL;
+ }
+ security_opts = &context->listener->security_options;
+ }else{
+ security_opts = &db->config->security_options;
+ }
+
+ if(security_opts->acl_list){
+ acl_tail = security_opts->acl_list;
+ while(acl_tail){
+ if(context->username){
+ if(acl_tail->username && !strcmp(context->username, acl_tail->username)){
+ context->acl_list = acl_tail;
+ break;
+ }
+ }else{
+ if(acl_tail->username == NULL){
+ context->acl_list = acl_tail;
+ break;
+ }
+ }
+ acl_tail = acl_tail->next;
+ }
+ if(context->username && context->acl_list == NULL){
+ return MOSQ_ERR_INVAL;
+ }
+ }else{
+ context->acl_list = NULL;
+ }
+
+ return MOSQ_ERR_SUCCESS;
+}
+
+
static int pwfile__parse(const char *file, struct mosquitto__unpwd **root)
{
FILE *pwfile;
diff --git a/src/subs.c b/src/subs.c
index 5953055c..bae4cb43 100644
--- a/src/subs.c
+++ b/src/subs.c
@@ -659,6 +659,27 @@ static int retain__process(struct mosquitto_db *db, struct mosquitto_msg_store *
return rc;
}
+ /* Check for original source access */
+ if(db->config->check_retain_source && retained->source_id){
+ struct mosquitto retain_ctxt;
+ memset(&retain_ctxt, 0, sizeof(struct mosquitto));
+
+ retain_ctxt.id = retained->source_id;
+ retain_ctxt.username = retained->source_username;
+ retain_ctxt.listener = retained->source_listener;
+
+ rc = acl__find_acls(db, &retain_ctxt);
+ if(rc) return rc;
+
+ rc = mosquitto_acl_check(db, &retain_ctxt, retained->topic, retained->payloadlen, UHPA_ACCESS(retained->payload, retained->payloadlen),
+ retained->qos, retained->retain, MOSQ_ACL_WRITE);
+ if(rc == MOSQ_ERR_ACL_DENIED){
+ return MOSQ_ERR_SUCCESS;
+ }else if(rc != MOSQ_ERR_SUCCESS){
+ return rc;
+ }
+ }
+
if (db->config->upgrade_outgoing_qos){
qos = sub_qos;
} else {
diff --git a/test/broker/04-retain-check-source-persist-diff-port.py b/test/broker/04-retain-check-source-persist-diff-port.py
new file mode 100755
index 00000000..c26ebe5f
--- /dev/null
+++ b/test/broker/04-retain-check-source-persist-diff-port.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+
+# Test for CVE-2018-12546, with the broker being stopped to write the persistence file, plus subscriber on different port.
+
+import inspect, os, sys
+# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
+cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
+if cmd_subfolder not in sys.path:
+ sys.path.insert(0, cmd_subfolder)
+
+import mosq_test
+import signal
+
+def write_config(filename, port1, port2, per_listener):
+ with open(filename, 'w') as f:
+ f.write("per_listener_settings %s\n" % (per_listener))
+ f.write("check_retain_source true\n")
+ f.write("port %d\n" % (port1))
+ f.write("acl_file %s\n" % (filename.replace('.conf', '.acl')))
+ f.write("persistence true\n")
+ f.write("persistence_file %s\n" % (filename.replace('.conf', '.db')))
+ f.write("listener %d\n" % (port2))
+
+def write_acl_1(filename, username):
+ with open(filename, 'w') as f:
+ if username is not None:
+ f.write('user %s\n' % (username))
+ f.write('topic readwrite test/topic\n')
+
+def write_acl_2(filename, username):
+ with open(filename, 'w') as f:
+ if username is not None:
+ f.write('user %s\n' % (username))
+ f.write('topic read test/topic\n')
+
+
+def do_test(per_listener, username):
+ conf_file = os.path.basename(__file__).replace('.py', '.conf')
+ write_config(conf_file, port1, port2, per_listener)
+
+ persistence_file = os.path.basename(__file__).replace('.py', '.db')
+ try:
+ os.remove(persistence_file)
+ except OSError:
+ pass
+
+ acl_file = os.path.basename(__file__).replace('.py', '.acl')
+ write_acl_1(acl_file, username)
+
+
+ rc = 1
+ keepalive = 60
+ connect_packet = mosq_test.gen_connect("retain-check", keepalive=keepalive, username=username)
+ connack_packet = mosq_test.gen_connack(rc=0)
+
+ if per_listener == "true":
+ u = None
+ else:
+ # If per listener is false, then the second client will be denied
+ # unless we provide a username
+ u = username
+
+ connect2_packet = mosq_test.gen_connect("retain-recv", keepalive=keepalive, username=u)
+ connack2_packet = mosq_test.gen_connack(rc=0)
+
+ mid = 1
+ publish_packet = mosq_test.gen_publish("test/topic", qos=0, payload="retained message", retain=True)
+ subscribe_packet = mosq_test.gen_subscribe(mid, "test/topic", 0)
+ suback_packet = mosq_test.gen_suback(mid, 0)
+
+ pingreq_packet = mosq_test.gen_pingreq()
+ pingresp_packet = mosq_test.gen_pingresp()
+
+ broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port1)
+
+ try:
+ sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port1)
+ sock.send(publish_packet)
+ sock.close()
+
+ sock = mosq_test.do_client_connect(connect2_packet, connack2_packet, port=port2)
+ mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback 1")
+
+ if mosq_test.expect_packet(sock, "publish", publish_packet):
+ sock.close()
+
+ # Remove "write" ability
+ write_acl_2(acl_file, username)
+ broker.terminate()
+ broker.wait()
+
+ broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port1)
+
+ sock = mosq_test.do_client_connect(connect2_packet, connack2_packet, port=port2)
+ mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback 2")
+ # If we receive the retained message here, it is a failure.
+ mosq_test.do_send_receive(sock, pingreq_packet, pingresp_packet, "pingresp")
+ rc = 0
+
+ sock.close()
+ finally:
+ broker.terminate()
+ broker.wait()
+ os.remove(conf_file)
+ os.remove(acl_file)
+ os.remove(persistence_file)
+ (stdo, stde) = broker.communicate()
+ if rc:
+ print(stde)
+ exit(rc)
+
+
+(port1, port2) = mosq_test.get_port(2)
+do_test("true", username=None)
+do_test("true", username="test")
+do_test("false", username=None)
+do_test("false", username="test")
diff --git a/test/broker/04-retain-check-source-persist.py b/test/broker/04-retain-check-source-persist.py
new file mode 100755
index 00000000..d383908e
--- /dev/null
+++ b/test/broker/04-retain-check-source-persist.py
@@ -0,0 +1,106 @@
+#!/usr/bin/env python
+
+# Test for CVE-2018-12546, with the broker being stopped to write the persistence file.
+
+import inspect, os, sys
+# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
+cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
+if cmd_subfolder not in sys.path:
+ sys.path.insert(0, cmd_subfolder)
+
+import mosq_test
+import signal
+
+def write_config(filename, port, per_listener):
+ with open(filename, 'w') as f:
+ f.write("per_listener_settings %s\n" % (per_listener))
+ f.write("check_retain_source true\n")
+ f.write("port %d\n" % (port))
+ f.write("acl_file %s\n" % (filename.replace('.conf', '.acl')))
+ f.write("persistence true\n")
+ f.write("persistence_file %s\n" % (filename.replace('.conf', '.db')))
+
+def write_acl_1(filename, username):
+ with open(filename, 'w') as f:
+ if username is not None:
+ f.write('user %s\n' % (username))
+ f.write('topic readwrite test/topic\n')
+
+def write_acl_2(filename, username):
+ with open(filename, 'w') as f:
+ if username is not None:
+ f.write('user %s\n' % (username))
+ f.write('topic read test/topic\n')
+
+
+def do_test(per_listener, username):
+ conf_file = os.path.basename(__file__).replace('.py', '.conf')
+ write_config(conf_file, port, per_listener)
+
+ persistence_file = os.path.basename(__file__).replace('.py', '.db')
+ try:
+ os.remove(persistence_file)
+ except OSError:
+ pass
+
+ acl_file = os.path.basename(__file__).replace('.py', '.acl')
+ write_acl_1(acl_file, username)
+
+
+ rc = 1
+ keepalive = 60
+ connect_packet = mosq_test.gen_connect("retain-check", keepalive=keepalive, username=username)
+ connack_packet = mosq_test.gen_connack(rc=0)
+
+ mid = 1
+ publish_packet = mosq_test.gen_publish("test/topic", qos=0, payload="retained message", retain=True)
+ subscribe_packet = mosq_test.gen_subscribe(mid, "test/topic", 0)
+ suback_packet = mosq_test.gen_suback(mid, 0)
+
+ pingreq_packet = mosq_test.gen_pingreq()
+ pingresp_packet = mosq_test.gen_pingresp()
+
+ broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
+
+ try:
+ sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
+ sock.send(publish_packet)
+ sock.close()
+
+ sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
+ mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback 1")
+
+ if mosq_test.expect_packet(sock, "publish", publish_packet):
+ sock.close()
+
+ # Remove "write" ability
+ write_acl_2(acl_file, username)
+ broker.terminate()
+ broker.wait()
+
+ broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
+
+ sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
+ mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback 2")
+ # If we receive the retained message here, it is a failure.
+ mosq_test.do_send_receive(sock, pingreq_packet, pingresp_packet, "pingresp")
+ rc = 0
+
+ sock.close()
+ finally:
+ broker.terminate()
+ broker.wait()
+ os.remove(conf_file)
+ os.remove(acl_file)
+ os.remove(persistence_file)
+ (stdo, stde) = broker.communicate()
+ if rc:
+ print(stde)
+ exit(rc)
+
+
+port = mosq_test.get_port()
+do_test("true", username=None)
+do_test("true", username="test")
+do_test("false", username=None)
+do_test("false", username="test")
diff --git a/test/broker/04-retain-check-source.py b/test/broker/04-retain-check-source.py
new file mode 100755
index 00000000..4ef51275
--- /dev/null
+++ b/test/broker/04-retain-check-source.py
@@ -0,0 +1,87 @@
+#!/usr/bin/env python
+
+# Test for CVE-2018-12546
+
+import inspect, os, sys
+# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
+cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
+if cmd_subfolder not in sys.path:
+ sys.path.insert(0, cmd_subfolder)
+
+import mosq_test
+import signal
+
+def write_config(filename, port, per_listener):
+ with open(filename, 'w') as f:
+ f.write("per_listener_settings %s\n" % (per_listener))
+ f.write("check_retain_source true\n")
+ f.write("port %d\n" % (port))
+ f.write("acl_file %s\n" % (filename.replace('.conf', '.acl')))
+
+def write_acl_1(filename):
+ with open(filename, 'w') as f:
+ f.write('topic readwrite test/topic\n')
+
+def write_acl_2(filename):
+ with open(filename, 'w') as f:
+ f.write('topic read test/topic\n')
+
+
+def do_test(per_listener):
+ conf_file = os.path.basename(__file__).replace('.py', '.conf')
+ write_config(conf_file, port, per_listener)
+
+ acl_file = os.path.basename(__file__).replace('.py', '.acl')
+ write_acl_1(acl_file)
+
+
+ rc = 1
+ keepalive = 60
+ connect_packet = mosq_test.gen_connect("retain-check", keepalive=keepalive)
+ connack_packet = mosq_test.gen_connack(rc=0)
+
+ mid = 1
+ publish_packet = mosq_test.gen_publish("test/topic", qos=0, payload="retained message", retain=True)
+ subscribe_packet = mosq_test.gen_subscribe(mid, "test/topic", 0)
+ suback_packet = mosq_test.gen_suback(mid, 0)
+
+ pingreq_packet = mosq_test.gen_pingreq()
+ pingresp_packet = mosq_test.gen_pingresp()
+
+ broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
+
+ try:
+ sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
+ sock.send(publish_packet)
+ sock.close()
+
+ sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
+ mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback 1")
+
+ if mosq_test.expect_packet(sock, "publish", publish_packet):
+ sock.close()
+
+ # Remove "write" ability
+ write_acl_2(acl_file)
+ broker.send_signal(signal.SIGHUP)
+
+ sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
+ mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback 2")
+ # If we receive the retained message here, it is a failure.
+ mosq_test.do_send_receive(sock, pingreq_packet, pingresp_packet, "pingresp")
+ rc = 0
+
+ sock.close()
+ finally:
+ os.remove(conf_file)
+ os.remove(acl_file)
+ broker.terminate()
+ broker.wait()
+ (stdo, stde) = broker.communicate()
+ if rc:
+ print(stde)
+ exit(rc)
+
+port = mosq_test.get_port()
+do_test("true")
+do_test("false")
diff --git a/test/broker/Makefile b/test/broker/Makefile
index d3b3e438..715b97c7 100644
--- a/test/broker/Makefile
+++ b/test/broker/Makefile
@@ -76,6 +76,9 @@ endif
./04-retain-qos1-qos0.py
./04-retain-qos0-clear.py
./04-retain-upgrade-outgoing-qos.py
+ ./04-retain-check-source.py
+ ./04-retain-check-source-persist.py
+ ./04-retain-check-source-persist-diff-port.py
05 :
./05-clean-session-qos1.py
diff --git a/test/broker/ptest.py b/test/broker/ptest.py
index 28f14f97..f21ae8ce 100755
--- a/test/broker/ptest.py
+++ b/test/broker/ptest.py
@@ -57,6 +57,9 @@ tests = [
(1, './04-retain-qos1-qos0.py'),
(1, './04-retain-qos0-clear.py'),
(1, './04-retain-upgrade-outgoing-qos.py'),
+ (1, './04-retain-check-source.py'),
+ (1, './04-retain-check-source-persist.py'),
+ (2, './04-retain-check-source-persist-diff-port.py'),
(1, './05-clean-session-qos1.py'),