diff --git a/ChangeLog.txt b/ChangeLog.txt index 0d18bd1f..d1bb42d5 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -9,6 +9,8 @@ Broker: - Fix some Coverity Scan reported errors that could occur when the broker was already failing to start. - Fix broken mosquitto_passwd on FreeBSD. Closes #1032. +- Fix delayed bridge local subscriptions causing missing messages. + Closes #1174. Library: - Use higher resolution timer for random initialisation of client id diff --git a/src/bridge.c b/src/bridge.c index 1a6303ed..c232b5da 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -115,6 +115,7 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context) char *notification_topic; int notification_topic_len; uint8_t notification_payload; + int i; if(!context || !context->bridge) return MOSQ_ERR_INVAL; @@ -140,6 +141,13 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context) */ sub__clean_session(db, context); + for(i=0; ibridge->topic_count; i++){ + if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ + log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); + if(sub__add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs)) return 1; + } + } + if(context->bridge->notifications){ if(context->bridge->notification_topic){ if(!context->bridge->initial_notification_done){ @@ -300,9 +308,6 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context) if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); if(sub__add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs)) return 1; - sub__retain_queue(db, context, - context->bridge->topics[i].local_topic, - context->bridge->topics[i].qos); } } diff --git a/src/handle_connack.c b/src/handle_connack.c index 9e16e4d4..a563189c 100644 --- a/src/handle_connack.c +++ b/src/handle_connack.c @@ -92,8 +92,6 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context) } for(i=0; ibridge->topic_count; i++){ if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ - log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); - if(sub__add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs)) return 1; sub__retain_queue(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos); diff --git a/test/broker/06-bridge-b2br-late-connection-retain.py b/test/broker/06-bridge-b2br-late-connection-retain.py new file mode 100755 index 00000000..8c52d667 --- /dev/null +++ b/test/broker/06-bridge-b2br-late-connection-retain.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python + +# Does a bridge queue up retained messages correctly if the remote broker starts up late? + +import socket + +import inspect, os, sys +# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder +cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) +if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) + +import mosq_test + +def write_config1(filename, persistence_file, port1, port2): + with open(filename, 'w') as f: + f.write("port %d\n" % (port2)) + f.write("\n") + f.write("persistence true\n") + f.write("persistence_file %s\n" % (persistence_file)) + +def write_config2(filename, persistence_file, port1, port2): + with open(filename, 'w') as f: + f.write("port %d\n" % (port2)) + f.write("\n") + f.write("connection bridge_sample\n") + f.write("address 127.0.0.1:%d\n" % (port1)) + f.write("topic bridge/# out 1\n") + f.write("notifications false\n") + f.write("bridge_attempt_unsubscribe false\n") + f.write("persistence true\n") + f.write("persistence_file %s\n" % (persistence_file)) + +(port1, port2) = mosq_test.get_port(2) +conf_file = os.path.basename(__file__).replace('.py', '.conf') +persistence_file = os.path.basename(__file__).replace('.py', '.db') + +rc = 1 +keepalive = 60 +client_id = socket.gethostname()+".bridge_sample" +connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, clean_session=False, proto_ver=128+4) +connack_packet = mosq_test.gen_connack(rc=0) + +c_connect_packet = mosq_test.gen_connect("client", keepalive=keepalive) +c_connack_packet = mosq_test.gen_connack(rc=0) + +mid = 1 +publish_packet = mosq_test.gen_publish("bridge/test", qos=1, mid=mid, payload="message", retain=True) +puback_packet = mosq_test.gen_puback(mid) + +pingreq_packet = mosq_test.gen_pingreq() +pingresp_packet = mosq_test.gen_pingresp() + +ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +ssock.settimeout(40) +ssock.bind(('', port1)) +ssock.listen(5) + +write_config1(conf_file, persistence_file, port1, port2) +broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port2, use_conf=True) + +try: + client = mosq_test.do_client_connect(c_connect_packet, c_connack_packet, timeout=20, port=port2) + mosq_test.do_send_receive(client, publish_packet, puback_packet, "puback") + client.close() + + broker.terminate() + broker.wait() + + # Restart, with retained message in place + write_config2(conf_file, persistence_file, port1, port2) + broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port2, use_conf=True) + + (bridge, address) = ssock.accept() + bridge.settimeout(20) + + if mosq_test.expect_packet(bridge, "connect", connect_packet): + bridge.send(connack_packet) + + if mosq_test.expect_packet(bridge, "publish", publish_packet): + bridge.send(puback_packet) + # Guard against multiple retained messages of the same type by + # sending a pingreq to give us something to expect back. If we get + # a publish, it's a fail. + mosq_test.do_send_receive(bridge, pingreq_packet, pingresp_packet, "pingresp") + rc = 0 + + bridge.close() +finally: + os.remove(conf_file) + try: + bridge.close() + except NameError: + pass + + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + os.remove(persistence_file) + if rc: + print(stde) + ssock.close() + +exit(rc) + diff --git a/test/broker/06-bridge-b2br-late-connection.py b/test/broker/06-bridge-b2br-late-connection.py new file mode 100755 index 00000000..59d5c345 --- /dev/null +++ b/test/broker/06-bridge-b2br-late-connection.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python + +# Does a bridge queue up messages correctly if the remote broker starts up late? + +import socket + +import inspect, os, sys +# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder +cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) +if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) + +import mosq_test + +def write_config(filename, port1, port2): + with open(filename, 'w') as f: + f.write("port %d\n" % (port2)) + f.write("\n") + f.write("connection bridge_sample\n") + f.write("address 127.0.0.1:%d\n" % (port1)) + f.write("topic bridge/# out 1\n") + f.write("notifications false\n") + f.write("bridge_attempt_unsubscribe false\n") + +(port1, port2) = mosq_test.get_port(2) +conf_file = os.path.basename(__file__).replace('.py', '.conf') +write_config(conf_file, port1, port2) + +rc = 1 +keepalive = 60 +client_id = socket.gethostname()+".bridge_sample" +connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, clean_session=False, proto_ver=128+4) +connack_packet = mosq_test.gen_connack(rc=0) + +c_connect_packet = mosq_test.gen_connect("client", keepalive=keepalive) +c_connack_packet = mosq_test.gen_connack(rc=0) + +mid = 1 +publish_packet = mosq_test.gen_publish("bridge/test", qos=1, mid=mid, payload="message") +puback_packet = mosq_test.gen_puback(mid) + +ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +ssock.settimeout(40) +ssock.bind(('', port1)) +ssock.listen(5) + +broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port2, use_conf=True) + +try: + (bridge, address) = ssock.accept() + bridge.settimeout(20) + + client = mosq_test.do_client_connect(c_connect_packet, c_connack_packet, timeout=20, port=port2) + mosq_test.do_send_receive(client, publish_packet, puback_packet, "puback") + client.close() + # We've now sent a message to the broker that should be delivered to us via the bridge + + if mosq_test.expect_packet(bridge, "connect", connect_packet): + bridge.send(connack_packet) + + if mosq_test.expect_packet(bridge, "publish", publish_packet): + rc = 0 + + bridge.close() +finally: + os.remove(conf_file) + try: + bridge.close() + except NameError: + pass + + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + ssock.close() + +exit(rc) + diff --git a/test/broker/Makefile b/test/broker/Makefile index be3b95e4..37601103 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -94,6 +94,8 @@ endif ./06-bridge-b2br-remapping.py ./06-bridge-br2b-remapping.py ./06-bridge-per-listener-settings.py + ./06-bridge-b2br-late-connection.py + ./06-bridge-b2br-late-connection-retain.py 07 : ./07-will-qos0.py diff --git a/test/broker/ptest.py b/test/broker/ptest.py index e2d1142a..74c441d9 100755 --- a/test/broker/ptest.py +++ b/test/broker/ptest.py @@ -73,6 +73,8 @@ tests = [ (2, './06-bridge-b2br-remapping.py'), (2, './06-bridge-br2b-remapping.py'), (3, './06-bridge-per-listener-settings.py'), + (2, './06-bridge-b2br-late-connection.py'), + (2, './06-bridge-b2br-late-connection-retain.py'), (1, './07-will-qos0.py'), (1, './07-will-null.py'),