|
|
|
@ -61,40 +61,14 @@ struct sub__token {
|
|
|
|
|
uint16_t topic_len;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
static int subs__send_msg(struct mosquitto_db *db, struct mosquitto *context, int sub_qos, int retain, int msg_qos, struct mosquitto_msg_store *stored)
|
|
|
|
|
{
|
|
|
|
|
int final_qos;
|
|
|
|
|
bool final_retain;
|
|
|
|
|
uint16_t mid;
|
|
|
|
|
|
|
|
|
|
if(db->config->upgrade_outgoing_qos){
|
|
|
|
|
final_qos = sub_qos;
|
|
|
|
|
}else{
|
|
|
|
|
final_qos = (msg_qos > sub_qos)?sub_qos:msg_qos;
|
|
|
|
|
}
|
|
|
|
|
if(final_qos > 0){
|
|
|
|
|
mid = mosquitto__mid_generate(context);
|
|
|
|
|
}else{
|
|
|
|
|
mid = 0;
|
|
|
|
|
}
|
|
|
|
|
if(context->is_bridge){
|
|
|
|
|
/* If we know the client is a bridge then we should set retain
|
|
|
|
|
* even if the message is fresh. If we don't do this, retained
|
|
|
|
|
* messages won't be propagated. */
|
|
|
|
|
final_retain = retain;
|
|
|
|
|
}else{
|
|
|
|
|
/* Client is not a bridge and this isn't a stale message so
|
|
|
|
|
* retain should be false. */
|
|
|
|
|
final_retain = false;
|
|
|
|
|
}
|
|
|
|
|
return db__message_insert(db, context, mid, mosq_md_out, final_qos, final_retain, stored);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hier, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
|
|
|
|
|
{
|
|
|
|
|
int rc = 0;
|
|
|
|
|
int rc2;
|
|
|
|
|
int client_qos, msg_qos;
|
|
|
|
|
uint16_t mid;
|
|
|
|
|
struct mosquitto__subleaf *leaf;
|
|
|
|
|
bool client_retain;
|
|
|
|
|
|
|
|
|
|
leaf = hier->subs;
|
|
|
|
|
|
|
|
|
@ -133,7 +107,33 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie
|
|
|
|
|
leaf = leaf->next;
|
|
|
|
|
continue;
|
|
|
|
|
}else if(rc2 == MOSQ_ERR_SUCCESS){
|
|
|
|
|
if(subs__send_msg(db, leaf->context, leaf->qos, retain, qos, stored) != 0) return 1;
|
|
|
|
|
client_qos = leaf->qos;
|
|
|
|
|
|
|
|
|
|
if(db->config->upgrade_outgoing_qos){
|
|
|
|
|
msg_qos = client_qos;
|
|
|
|
|
}else{
|
|
|
|
|
if(qos > client_qos){
|
|
|
|
|
msg_qos = client_qos;
|
|
|
|
|
}else{
|
|
|
|
|
msg_qos = qos;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if(msg_qos){
|
|
|
|
|
mid = mosquitto__mid_generate(leaf->context);
|
|
|
|
|
}else{
|
|
|
|
|
mid = 0;
|
|
|
|
|
}
|
|
|
|
|
if(leaf->context->is_bridge){
|
|
|
|
|
/* If we know the client is a bridge then we should set retain
|
|
|
|
|
* even if the message is fresh. If we don't do this, retained
|
|
|
|
|
* messages won't be propagated. */
|
|
|
|
|
client_retain = retain;
|
|
|
|
|
}else{
|
|
|
|
|
/* Client is not a bridge and this isn't a stale message so
|
|
|
|
|
* retain should be false. */
|
|
|
|
|
client_retain = false;
|
|
|
|
|
}
|
|
|
|
|
if(db__message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored) == 1) rc = 1;
|
|
|
|
|
}else{
|
|
|
|
|
return 1; /* Application error */
|
|
|
|
|
}
|
|
|
|
@ -510,14 +510,11 @@ int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored)
|
|
|
|
|
{
|
|
|
|
|
int rc = MOSQ_ERR_SUCCESS;
|
|
|
|
|
int rc = 0;
|
|
|
|
|
struct mosquitto__subhier *subhier;
|
|
|
|
|
struct sub__token *tokens = NULL;
|
|
|
|
|
struct mosquitto *context;
|
|
|
|
|
int sub_qos;
|
|
|
|
|
|
|
|
|
|
assert(db);
|
|
|
|
|
assert(topic);
|
|
|
|
@ -530,29 +527,6 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch
|
|
|
|
|
*/
|
|
|
|
|
(*stored)->ref_count++;
|
|
|
|
|
|
|
|
|
|
if(db->config->allow_direct_messages){
|
|
|
|
|
if(!strncmp(UHPA_ACCESS_TOPIC(tokens), "$CLIENT", tokens->topic_len)){
|
|
|
|
|
tokens = tokens->next;
|
|
|
|
|
if(tokens && !strncmp(UHPA_ACCESS_TOPIC(tokens), "direct", tokens->topic_len)){
|
|
|
|
|
tokens = tokens->next;
|
|
|
|
|
if(tokens && tokens->topic_len > 0){
|
|
|
|
|
HASH_FIND(hh_id, db->contexts_by_id, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, context);
|
|
|
|
|
if(context){
|
|
|
|
|
if(db->config->direct_message_qos == -1){
|
|
|
|
|
sub_qos = qos;
|
|
|
|
|
}else{
|
|
|
|
|
sub_qos = db->config->direct_message_qos;
|
|
|
|
|
}
|
|
|
|
|
rc = subs__send_msg(db, context, sub_qos, retain, qos, *stored);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
db__msg_store_deref(db, stored);
|
|
|
|
|
sub__topic_tokens_free(tokens);
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
subhier = db->subs.children;
|
|
|
|
|
while(subhier){
|
|
|
|
|
if(!strcmp(UHPA_ACCESS_TOPIC(subhier), UHPA_ACCESS_TOPIC(tokens))){
|
|
|
|
|