@ -16,50 +16,51 @@ def do_test(start_broker, topic, succeeds):
publish_packet = mosq_test . gen_publish ( topic , qos = 0 , payload = " message " )
port = mosq_test . get_port ( )
broker = None
if start_broker :
broker = mosq_test . start_broker ( filename = os . path . basename ( __file__ ) , port = port )
try :
sock = mosq_test . do_client_connect ( connect_packet , connack_packet , timeout = 20 , port = port )
if succeeds == True :
if succeeds :
mosq_test . do_send_receive ( sock , subscribe_packet , suback_packet , " suback " )
mosq_test . do_send_receive ( sock , publish_packet , publish_packet , " publish " )
else :
mosq_test . do_send_receive ( sock , subscribe_packet , b " " , " suback " )
try :
mosq_test . do_send_receive ( sock , subscribe_packet , b " " , " suback " )
return 1
except BrokenPipeError :
pass
rc = 0
sock . close ( )
except mosq_test . TestError :
pass
finally :
if start_ broker:
if broker :
broker . terminate ( )
broker . wait ( )
( stdo , stde ) = broker . communicate ( )
if rc :
print ( stde . decode ( ' utf-8 ' ) )
exit ( rc )
else :
return rc
return rc
def all_tests ( start_broker = False ) :
rc = do_test ( start_broker , " / " * 200 , True ) # 200 max hierarchy limit
if rc :
return rc ;
return rc
rc = do_test ( start_broker , " abc/ " * 199 + " d " , True ) # 200 max hierarchy limit, longer overall string than 200
if rc :
return rc ;
return rc
rc = do_test ( start_broker , " / " * 201 , False ) # Exceeds 200 max hierarchy limit
if rc :
return rc ;
return rc
rc = do_test ( start_broker , " abc/ " * 201 + " d " , False ) # Exceeds 200 max hierarchy limit, longer overall string than 200
if rc :
return rc ;
return rc
return 0
if __name__ == ' __main__ ' :
all_tests( True )
sys. exit ( all_tests( True ) )