Handle some incoming PUBLISH properties.

PAYLOAD_FORMAT_INDICATOR, CORRELATION_DATA, USER_PROPERTY, CONTENT_TYPE
are now all passed on to subscribing clients from an incoming PUBLISH
only (not from Wills). The other PUBLISH properties are silently
dropped.
pull/1022/head
Roger A. Light 7 years ago
parent 6996fd450a
commit 12fa336140

@ -46,6 +46,8 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
int slen;
char *topic_mount;
struct mqtt5__property *properties = NULL;
struct mqtt5__property *p, *p_prev;
struct mqtt5__property *msg_properties = NULL, *msg_properties_last;
#ifdef WITH_BRIDGE
char *topic_temp;
@ -137,10 +139,62 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
}
}
/* Handle properties */
if(context->protocol == mosq_p_mqtt5){
rc = property__read_all(PUBLISH, &context->in_packet, &properties);
if(rc) return rc;
p = properties;
p_prev = NULL;
msg_properties = NULL;
msg_properties_last = NULL;
while(p){
switch(p->identifier){
case PROP_PAYLOAD_FORMAT_INDICATOR:
case PROP_CORRELATION_DATA:
case PROP_USER_PROPERTY:
case PROP_CONTENT_TYPE:
if(msg_properties){
msg_properties_last->next = p;
msg_properties_last = p;
}else{
msg_properties = p;
msg_properties_last = p;
}
if(p_prev){
p_prev->next = p->next;
msg_properties_last->next = NULL;
p = p_prev->next;
}else{
properties = p->next;
msg_properties_last->next = NULL;
p = properties;
}
break;
case PROP_TOPIC_ALIAS:
p = p->next;
break;
case PROP_RESPONSE_TOPIC:
p = p->next;
break;
case PROP_MESSAGE_EXPIRY_INTERVAL:
p = p->next;
break;
case PROP_SUBSCRIPTION_IDENTIFIER:
p = p->next;
break;
default:
p = p->next;
break;
}
}
}
property__free_all(&properties);
payloadlen = context->in_packet.remaining_length - context->in_packet.pos;
G_PUB_BYTES_RECEIVED_INC(payloadlen);
@ -149,7 +203,6 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
topic_mount = mosquitto__malloc(len+1);
if(!topic_mount){
mosquitto__free(topic);
property__free_all(&properties);
return MOSQ_ERR_NOMEM;
}
snprintf(topic_mount, len, "%s%s", context->listener->mount_point, topic);
@ -166,13 +219,11 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
}
if(UHPA_ALLOC(payload, payloadlen+1) == 0){
mosquitto__free(topic);
property__free_all(&properties);
return MOSQ_ERR_NOMEM;
}
if(packet__read_bytes(&context->in_packet, UHPA_ACCESS(payload, payloadlen), payloadlen)){
mosquitto__free(topic);
UHPA_FREE(payload, payloadlen);
property__free_all(&properties);
return 1;
}
}
@ -185,7 +236,6 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
}else if(rc != MOSQ_ERR_SUCCESS){
mosquitto__free(topic);
UHPA_FREE(payload, payloadlen);
property__free_all(&properties);
return rc;
}
@ -195,10 +245,10 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
}
if(!stored){
dup = 0;
if(db__message_store(db, context->id, mid, topic, qos, payloadlen, &payload, retain, &stored, properties, 0)){
if(db__message_store(db, context->id, mid, topic, qos, payloadlen, &payload, retain, &stored, msg_properties, 0)){
return 1;
}
properties = NULL; /* Now belongs to db__message_store() */
msg_properties = NULL; /* Now belongs to db__message_store() */
}else{
mosquitto__free(topic);
topic = stored->topic;
@ -232,7 +282,6 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
return rc;
process_bad_message:
mosquitto__free(topic);
property__free_all(&properties);
UHPA_FREE(payload, payloadlen);
switch(qos){
case 0:

@ -0,0 +1,54 @@
#!/usr/bin/env python
# Test whether a client subscribed to a topic receives its own message sent to that topic.
# Does the Payload Format Indicator property get sent through?
# MQTT v5
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
import mqtt5_props
rc = 1
mid = 53
keepalive = 60
connect_packet = mosq_test.gen_connect("subpub-qos0-test", keepalive=keepalive, proto_ver=5)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos0", 0, proto_ver=5)
suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)
props = mqtt5_props.gen_byte_prop(mqtt5_props.PROP_PAYLOAD_FORMAT_INDICATOR, 0xed)
props = props+mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_TOPIC_ALIAS, 1)
props = mqtt5_props.prop_finalise(props)
publish_packet_out = mosq_test.gen_publish("subpub/qos0", qos=0, payload="message", proto_ver=5, properties=props)
props = mqtt5_props.gen_byte_prop(mqtt5_props.PROP_PAYLOAD_FORMAT_INDICATOR, 0xed)
props = mqtt5_props.prop_finalise(props)
publish_packet_expected = mosq_test.gen_publish("subpub/qos0", qos=0, payload="message", proto_ver=5, properties=props)
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
mosq_test.do_send_receive(sock, publish_packet_out, publish_packet_expected, "publish")
rc = 0
sock.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)
exit(rc)

@ -53,6 +53,7 @@ endif
./02-unsubscribe-qos1.py
./02-unsubscribe-qos2.py
./02-unsubscribe-qos2-v5.py
./02-subpub-qos0-payload-format-v5.py
./02-unsubscribe-invalid-no-topic.py
./02-subscribe-invalid-utf8.py
./02-subscribe-persistence-flipflop.py

@ -32,6 +32,7 @@ tests = [
(1, './02-subpub-qos0-v5.py'),
(1, './02-subpub-qos1-v5.py'),
(1, './02-subpub-qos2-v5.py'),
(1, './02-subpub-qos0-payload-format-v5.py'),
(1, './02-unsubscribe-qos0.py'),
(1, './02-unsubscribe-qos1.py'),
(1, './02-unsubscribe-qos2.py'),

@ -363,15 +363,17 @@ def gen_connack(resv=0, rc=0, proto_ver=4):
return packet
def gen_publish(topic, qos, payload=None, retain=False, dup=False, mid=0, proto_ver=4):
def gen_publish(topic, qos, payload=None, retain=False, dup=False, mid=0, proto_ver=4, properties=None):
rl = 2+len(topic)
pack_format = "H"+str(len(topic))+"s"
if qos > 0:
rl = rl + 2
pack_format = pack_format + "H"
if proto_ver == 5:
rl += 1
pack_format = pack_format + "B"
rl += len(properties)
# This will break if len(properties) > 127
pack_format = pack_format + "%ds"%(len(properties))
if payload != None:
rl = rl + len(payload)
@ -389,9 +391,9 @@ def gen_publish(topic, qos, payload=None, retain=False, dup=False, mid=0, proto_
if proto_ver == 5:
if qos > 0:
return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, mid, 0, payload)
return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, mid, properties, payload)
else:
return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, 0, payload)
return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, properties, payload)
else:
if qos > 0:
return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, mid, payload)

@ -0,0 +1,66 @@
import struct
PROP_PAYLOAD_FORMAT_INDICATOR = 1
PROP_MESSAGE_EXPIRY_INTERVAL = 2
PROP_CONTENT_TYPE = 3
PROP_RESPONSE_TOPIC = 8
PROP_CORRELATION_DATA = 9
PROP_SUBSCRIPTION_IDENTIFIER = 11
PROP_SESSION_EXPIRY_INTERVAL = 17
PROP_ASSIGNED_CLIENT_IDENTIFIER = 18
PROP_SERVER_KEEP_ALIVE = 19
PROP_AUTHENTICATION_METHOD = 21
PROP_AUTHENTICATION_DATA = 22
PROP_REQUEST_PROBLEM_INFO = 23
PROP_WILL_DELAY_INTERVAL = 24
PROP_REQUEST_RESPONSE_INFO = 25
PROP_RESPONSE_INFO = 26
PROP_SERVER_REFERENCE = 28
PROP_REASON_STRING = 31
PROP_RECEIVE_MAXIMUM = 33
PROP_TOPIC_ALIAS_MAXIMUM = 34
PROP_TOPIC_ALIAS = 35
PROP_MAXIMUM_QOS = 36
PROP_RETAIN_AVAILABLE = 37
PROP_USER_PROPERTY = 38
PROP_MAXIMUM_PACKET_SIZE = 39
PROP_WILDCARD_SUB_AVAILABLE = 40
PROP_SUBSCRIPTION_ID_AVAILABLE = 41
PROP_SHARED_SUB_AVAILABLE = 42
def gen_byte_prop(identifier, byte):
prop = struct.pack('BB', identifier, byte)
return prop
def gen_uint16_prop(identifier, word):
prop = struct.pack('!BH', identifier, word)
return prop
def gen_uint32_prop(identifier, word):
prop = struct.pack('!BI', identifier, word)
return prop
def gen_string_prop(identifier, s):
prop = struct.pack('!BH%ds'%(len(s)), identifier, len(s), s)
return prop
def gen_string_pair_prop(identifier, s1, s2):
prop = struct.pack('!BH%dsH%ds'%(len(s1), len(s2)), identifier, len(s1), s1, len(s2), s2)
return prop
def pack_varint(varint):
s = ""
while True:
byte = varint % 128
varint = varint // 128
# If there are more digits to encode, set the top bit of this digit
if varint > 0:
byte = byte | 0x80
s = s + struct.pack("!B", byte)
if varint == 0:
return s
def prop_finalise(props):
return pack_varint(len(props)) + props
Loading…
Cancel
Save