Add mosquitto_ctrl tests
parent
89cca44c17
commit
9cc5d1d6ff
@ -0,0 +1,102 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Test parsing of command line args and errors. Does not test arg functionality.
|
||||
|
||||
from mosq_test_helper import *
|
||||
|
||||
def do_test(args, rc_expected, response=None):
|
||||
proc = subprocess.run([mosq_test.get_build_root()+"/apps/mosquitto_ctrl/mosquitto_ctrl"]
|
||||
+ args,
|
||||
env=env, capture_output=True, encoding='utf-8', timeout=2)
|
||||
|
||||
if response is not None:
|
||||
if proc.stderr != response:
|
||||
print(len(proc.stderr))
|
||||
print(len(response))
|
||||
raise ValueError(proc.stderr)
|
||||
|
||||
if proc.returncode != rc_expected:
|
||||
raise ValueError(args)
|
||||
|
||||
env = dict(os.environ)
|
||||
env['LD_LIBRARY_PATH'] = mosq_test.get_build_root() + '/lib'
|
||||
|
||||
do_test([], 1)
|
||||
do_test(["broker"], 1)
|
||||
do_test(["-A"], 1, response="Error: -A argument given but no address specified.\n\n")
|
||||
do_test(["-A", "127.0.0.1"], 1) # Gives generic help
|
||||
do_test(["--cafile"], 1, response="Error: --cafile argument given but no file specified.\n\n")
|
||||
do_test(["--cafile", mosq_test.get_build_root()+"/test/ssl/all-ca.crt"], 1) # Gives generic help
|
||||
do_test(["--capath"], 1, response="Error: --capath argument given but no directory specified.\n\n")
|
||||
do_test(["--capath", mosq_test.get_build_root()+"/test/ssl"], 1) # Gives generic help
|
||||
do_test(["--cert"], 1, response="Error: --cert argument given but no file specified.\n\n")
|
||||
do_test(["--cert", mosq_test.get_build_root()+"/test/ssl/client.crt"], 1, response="Error: Both certfile and keyfile must be provided if one of them is set.\n")
|
||||
do_test(["--key"], 1, response="Error: --key argument given but no file specified.\n\n")
|
||||
do_test(["--key", mosq_test.get_build_root()+"/test/ssl/client.key"], 1, response="Error: Both certfile and keyfile must be provided if one of them is set.\n")
|
||||
do_test(["--ciphers"], 1, response="Error: --ciphers argument given but no ciphers specified.\n\n")
|
||||
do_test(["--ciphers", "DEFAULT"], 1) # Gives generic help
|
||||
do_test(["--debug"], 1) # Gives generic help
|
||||
do_test(["-f"], 1, response="Error: -f argument given but no data file specified.\n\n")
|
||||
do_test(["-f", mosq_test.get_build_root()+"/test/ssl/test"], 1) # Gives generic help
|
||||
do_test(["--help"], 1) # Gives generic help
|
||||
do_test(["--host"], 1, response="Error: -h argument given but no host specified.\n\n")
|
||||
do_test(["--host", "127.0.0.1"], 1) # Gives generic help
|
||||
do_test(["-i"], 1, response="Error: -i argument given but no id specified.\n\n")
|
||||
do_test(["-i", "clientid"], 1) # Gives generic help
|
||||
do_test(["--insecure"], 1) # Gives generic help
|
||||
do_test(["--keyform"], 1, response="Error: --keyform argument given but no keyform specified.\n\n")
|
||||
do_test(["--keyform", "key"], 1) # Gives generic help
|
||||
do_test(['-L'], 1, response="Error: -L argument given but no URL specified.\n\n")
|
||||
do_test(['-L', 'invalid://'], 1, response="Error: Unsupported URL scheme.\n\n")
|
||||
do_test(['-L', 'mqtt://localhost'], 1, response="Error: Invalid URL for -L argument specified - topic missing.\n")
|
||||
do_test(['-L', 'mqtts://localhost'], 1, response="Error: Invalid URL for -L argument specified - topic missing.\n")
|
||||
do_test(['-L', 'mqtts://localhost/'], 1)
|
||||
do_test(['-L', 'mqtts://:@localhost/topic'], 1, response="Error: Empty username in URL.\n")
|
||||
do_test(['-L', 'mqtts://localhost:/topic'], 1, response="Error: Empty port in URL.\n")
|
||||
do_test(['-L', 'mqtts://username:password@localhost:1887/topic'], 1)
|
||||
do_test(["-o"], 1, response="Error: -o argument given but no options file specified.\n\n")
|
||||
do_test(["-o", "file"], 1) # Gives generic help
|
||||
do_test(["-p"], 1, response="Error: -p argument given but no port specified.\n\n")
|
||||
do_test(["-p", "1887"], 1) # Gives generic help
|
||||
do_test(["-p", "-1"], 1, response="Error: Invalid port given: -1\n")
|
||||
do_test(["-p", "65536"], 1, response="Error: Invalid port given: 65536\n")
|
||||
do_test(["-P"], 1, response="Error: -P argument given but no password specified.\n\n")
|
||||
do_test(["-P", "password"], 1) # Gives generic help
|
||||
do_test(["-q"], 1, response="Error: -q argument given but no QoS specified.\n\n")
|
||||
do_test(["-q", "1"], 1) # Gives generic help
|
||||
do_test(["-q", "-1"], 1, response="Error: Invalid QoS given: -1\n")
|
||||
do_test(["-q", "3"], 1, response="Error: Invalid QoS given: 3\n")
|
||||
do_test(["--quiet"], 1) # Gives generic help
|
||||
do_test(["--tls-alpn"], 1, response="Error: --tls-alpn argument given but no protocol specified.\n\n")
|
||||
do_test(["--tls-alpn", "protocol"], 1) # Gives generic help
|
||||
do_test(["--tls-engine"], 1, response="Error: --tls-engine argument given but no engine_id specified.\n\n")
|
||||
do_test(["--tls-engine", "engine"], 1) # Gives generic help
|
||||
do_test(["--tls-engine-kpass-sha1"], 1, response="Error: --tls-engine-kpass-sha1 argument given but no kpass sha1 specified.\n\n")
|
||||
do_test(["--tls-engine-kpass-sha1", "sha1"], 1) # Gives generic help
|
||||
do_test(["--tls-version"], 1, response="Error: --tls-version argument given but no version specified.\n\n")
|
||||
do_test(["--tls-version", "tlsv1.3"], 1) # Gives generic help
|
||||
do_test(["--username"], 1, response="Error: -u argument given but no username specified.\n\n")
|
||||
do_test(["--username", "username"], 1) # Gives generic help
|
||||
do_test(["--unix"], 1, response="Error: --unix argument given but no socket path specified.\n\n")
|
||||
do_test(["--unix", "sock"], 1) # Gives generic help
|
||||
do_test(["-V"], 1, response="Error: --protocol-version argument given but no version specified.\n\n")
|
||||
do_test(["-V", "2"], 1, response="Error: Invalid protocol version argument given.\n\n")
|
||||
do_test(["-V", "31"], 1) # Gives generic help
|
||||
do_test(["-V", "311"], 1) # Gives generic help
|
||||
do_test(["-V", "5"], 1) # Gives generic help
|
||||
do_test(["-V", "6"], 1, response="Error: Invalid protocol version argument given.\n\n")
|
||||
do_test(["--verbose"], 1) # Gives generic help
|
||||
do_test(["--version"], 1) # Gives generic help
|
||||
do_test(["--unknown"], 1, response="Error: Unknown option '--unknown'.\n")
|
||||
|
||||
# Broker
|
||||
do_test(["broker", "unknown"], 13, response="Command 'unknown' not recognised.\n")
|
||||
|
||||
# Dynsec
|
||||
do_test(["dynsec", "unknown"], 13, response="Command 'unknown' not recognised.\n")
|
||||
do_test(["-f", "file", "dynsec", "setClientPassword", "admin", "admin", "-i"], 3, response="Error: -i argument given, but no iterations provided.\nError: Invalid input.\n")
|
||||
do_test(["-f", "file", "dynsec", "setClientPassword", "admin", "admin", "-c"], 3, response="Error: Unknown argument: -c\nError: Invalid input.\n")
|
||||
do_test(["dynsec", "createClient", "client", "-c"], 3, response="Error: -c argument given, but no clientid provided.\nError: Invalid input.\n")
|
||||
do_test(["dynsec", "createClient", "client", "-p"], 3, response="Error: -p argument given, but no password provided.\nError: Invalid input.\n")
|
||||
|
||||
exit(0)
|
@ -0,0 +1,97 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# mosquitto_ctrl broker
|
||||
|
||||
from mosq_test_helper import *
|
||||
import json
|
||||
import shutil
|
||||
|
||||
def write_config(filename, ports):
|
||||
with open(filename, 'w') as f:
|
||||
f.write("enable_control_api true\n")
|
||||
f.write(f"global_plugin {mosq_test.get_build_root()}/plugins/dynamic-security/mosquitto_dynamic_security.so\n")
|
||||
f.write(f"plugin_opt_config_file {ports[0]}/dynamic-security.json\n")
|
||||
f.write("allow_anonymous false\n")
|
||||
f.write(f"listener {ports[0]}\n")
|
||||
f.write(f"listener {ports[1]}\n")
|
||||
f.write(f"certfile {mosq_test.get_build_root()}/test/ssl/server.crt\n")
|
||||
f.write(f"keyfile {mosq_test.get_build_root()}/test/ssl/server.key\n")
|
||||
|
||||
def ctrl_cmd(cmd, args, ports, response=None):
|
||||
opts = ["-u", "admin",
|
||||
"-P", "admin",
|
||||
"-V", "5"]
|
||||
|
||||
if response is None:
|
||||
opts += [
|
||||
"-p", str(ports[0]),
|
||||
"-q", "1"
|
||||
]
|
||||
capture_output = False
|
||||
else:
|
||||
opts += ["-p", str(ports[1])]
|
||||
opts += ["--cafile", f"{mosq_test.get_build_root()}/test/ssl/all-ca.crt"]
|
||||
capture_output = True
|
||||
|
||||
proc = subprocess.run([mosq_test.get_build_root()+"/apps/mosquitto_ctrl/mosquitto_ctrl"]
|
||||
+ opts + [cmd] + args,
|
||||
env=env, capture_output=True, encoding='utf-8')
|
||||
|
||||
if response is not None:
|
||||
if proc.stdout != response:
|
||||
raise ValueError(proc.stdout)
|
||||
|
||||
if proc.returncode != 0:
|
||||
raise ValueError(args)
|
||||
|
||||
|
||||
rc = 0
|
||||
ports = mosq_test.get_port(2)
|
||||
conf_file = os.path.basename(__file__).replace('.py', '.conf')
|
||||
write_config(conf_file, ports)
|
||||
|
||||
env = dict(os.environ)
|
||||
env['LD_LIBRARY_PATH'] = mosq_test.get_build_root() + '/lib'
|
||||
|
||||
if not os.path.exists(str(ports[0])):
|
||||
os.mkdir(str(ports[0]))
|
||||
|
||||
# Generate initial dynsec file
|
||||
ctrl_cmd("dynsec", ["init", f"{ports[0]}/dynamic-security.json", "admin", "admin"], ports)
|
||||
ctrl_cmd("broker", ["help"], ports)
|
||||
|
||||
# Then start broker
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=ports[0])
|
||||
|
||||
try:
|
||||
ctrl_cmd("dynsec", ["addRoleACL", "admin", "publishClientSend", "$CONTROL/#", "allow"], ports)
|
||||
ctrl_cmd("dynsec", ["addRoleACL", "admin", "publishClientReceive", "$CONTROL/#", "allow"], ports)
|
||||
ctrl_cmd("dynsec", ["addRoleACL", "admin", "subscribePattern", "$CONTROL/#", "allow"], ports)
|
||||
|
||||
ctrl_cmd("broker", ["listListeners"], ports, response=f"Listener 1:\n Port: {ports[0]}\n Protocol: mqtt\n TLS: false\n\nListener 1:\n Port: {ports[1]}\n Protocol: mqtt\n TLS: true\n\n")
|
||||
|
||||
ctrl_cmd("broker", ["listPlugins"], ports, response="Plugin: dynamic-security\nControl endpoints: $CONTROL/dynamic-security/v1\n")
|
||||
|
||||
rc = 0
|
||||
except mosq_test.TestError:
|
||||
pass
|
||||
except Exception as err:
|
||||
print(err)
|
||||
finally:
|
||||
os.remove(conf_file)
|
||||
try:
|
||||
os.remove(f"{ports[0]}/dynamic-security.json")
|
||||
pass
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
shutil.rmtree(f"{ports[0]}")
|
||||
broker.terminate()
|
||||
if mosq_test.wait_for_subprocess(broker):
|
||||
print("broker not terminated")
|
||||
if rc == 0: rc=1
|
||||
(_, stde) = broker.communicate()
|
||||
if rc:
|
||||
print(stde.decode('utf-8'))
|
||||
|
||||
|
||||
exit(rc)
|
@ -0,0 +1,232 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from mosq_test_helper import *
|
||||
import json
|
||||
import shutil
|
||||
|
||||
def write_config(filename, ports):
|
||||
with open(filename, 'w') as f:
|
||||
f.write(f"global_plugin {mosq_test.get_build_root()}/plugins/dynamic-security/mosquitto_dynamic_security.so\n")
|
||||
f.write(f"plugin_opt_config_file {ports[0]}/dynamic-security.json\n")
|
||||
f.write("allow_anonymous false\n")
|
||||
f.write(f"listener {ports[0]}\n")
|
||||
f.write(f"listener {ports[1]}\n")
|
||||
f.write(f"certfile {mosq_test.get_build_root()}/test/ssl/server.crt\n")
|
||||
f.write(f"keyfile {mosq_test.get_build_root()}/test/ssl/server.key\n")
|
||||
|
||||
def ctrl_dynsec_cmd(args, ports, response=None, input=None):
|
||||
opts = ["-u", "admin",
|
||||
"-P", "newadmin",]
|
||||
|
||||
if response is None:
|
||||
opts += [
|
||||
"-p", str(ports[0]),
|
||||
"-q", "1"
|
||||
]
|
||||
else:
|
||||
opts += ["-p", str(ports[1])]
|
||||
opts += ["--cafile", f"{mosq_test.get_build_root()}/test/ssl/all-ca.crt"]
|
||||
|
||||
proc = subprocess.run([mosq_test.get_build_root()+"/apps/mosquitto_ctrl/mosquitto_ctrl"]
|
||||
+ opts + ["dynsec"] + args,
|
||||
env=env, capture_output=True, encoding='utf-8', timeout=2, input=input)
|
||||
|
||||
if response is not None:
|
||||
if proc.stdout != response:
|
||||
print(len(proc.stdout))
|
||||
print(len(response))
|
||||
raise ValueError(proc.stdout)
|
||||
|
||||
if proc.returncode != 0:
|
||||
raise ValueError(args)
|
||||
|
||||
def ctrl_dynsec_file_cmd(args, ports, response=None):
|
||||
opts = ["-f", f"{ports[0]}/dynamic-security.json"]
|
||||
|
||||
proc = subprocess.run([mosq_test.get_build_root()+"/apps/mosquitto_ctrl/mosquitto_ctrl"]
|
||||
+ opts + ["dynsec"] + args,
|
||||
env=env, capture_output=True, encoding='utf-8')
|
||||
|
||||
if response is not None:
|
||||
if proc.stdout != response:
|
||||
raise ValueError(proc.stdout)
|
||||
|
||||
if proc.returncode != 0:
|
||||
raise ValueError(args)
|
||||
|
||||
|
||||
|
||||
ports = mosq_test.get_port(2)
|
||||
conf_file = os.path.basename(__file__).replace('.py', '.conf')
|
||||
write_config(conf_file, ports)
|
||||
|
||||
env = dict(os.environ)
|
||||
env['LD_LIBRARY_PATH'] = mosq_test.get_build_root() + '/lib'
|
||||
|
||||
if not os.path.exists(str(ports[0])):
|
||||
os.mkdir(str(ports[0]))
|
||||
|
||||
# Generate initial dynsec file
|
||||
ctrl_dynsec_cmd(["init", f"{ports[0]}/dynamic-security.json", "admin", "admin"], ports)
|
||||
|
||||
ctrl_dynsec_file_cmd(["help"], ports) # get the help, don't check the response though
|
||||
ctrl_dynsec_file_cmd(["setClientPassword", "admin", "newadmin", "-i", "10000"], ports)
|
||||
|
||||
# Then start broker
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=ports[0], nolog=True)
|
||||
|
||||
try:
|
||||
rc = 1
|
||||
|
||||
# Set default access to opposite of normal
|
||||
ctrl_dynsec_cmd(["setDefaultACLAccess", "publishClientSend", "allow"], ports)
|
||||
ctrl_dynsec_cmd(["setDefaultACLAccess", "publishClientReceive", "deny"], ports)
|
||||
ctrl_dynsec_cmd(["setDefaultACLAccess", "subscribe", "allow"], ports)
|
||||
ctrl_dynsec_cmd(["setDefaultACLAccess", "unsubscribe", "deny"], ports)
|
||||
|
||||
# Verify
|
||||
ctrl_dynsec_cmd(["getDefaultACLAccess"], ports, response="publishClientSend : allow\npublishClientReceive : deny\nsubscribe : allow\nunsubscribe : deny\n")
|
||||
|
||||
# Create clients
|
||||
ctrl_dynsec_cmd(["createClient", "username1", "-p", "password1"], ports) # password, no client id
|
||||
ctrl_dynsec_cmd(["createClient", "username2", "-p", "password2", "-c", "clientid2"], ports) # password and client id
|
||||
ctrl_dynsec_cmd(["createClient", "username3"], ports, input="pw\npw\n") # password, no client id
|
||||
ctrl_dynsec_cmd(["createClient", "username4"], ports, input="\n\n") # password, no client id
|
||||
ctrl_dynsec_cmd(["createClient", "username5"], ports, input="not\nmatching\n")
|
||||
|
||||
# List clients
|
||||
ctrl_dynsec_cmd(["listClients"], ports, response="admin\nusername1\nusername2\nusername3\nusername4\n")
|
||||
ctrl_dynsec_cmd(["listClients", "1"], ports, response="admin\n") # with count
|
||||
ctrl_dynsec_cmd(["listClients", "1", "1"], ports, response="username1\n") # with count, offset
|
||||
|
||||
# Create groups
|
||||
ctrl_dynsec_cmd(["createGroup", "group1"], ports)
|
||||
ctrl_dynsec_cmd(["createGroup", "group2"], ports)
|
||||
ctrl_dynsec_cmd(["createGroup", "group3"], ports)
|
||||
|
||||
#List groups
|
||||
ctrl_dynsec_cmd(["listGroups"], ports, response="group1\ngroup2\ngroup3\n")
|
||||
ctrl_dynsec_cmd(["listGroups", "1"], ports, response="group1\n")
|
||||
ctrl_dynsec_cmd(["listGroups", "1", "1"], ports, response="group2\n")
|
||||
|
||||
# Add client to group
|
||||
ctrl_dynsec_cmd(["addGroupClient", "group1", "username1", "10"], ports)
|
||||
|
||||
# Get anonymous group
|
||||
ctrl_dynsec_cmd(["getAnonymousGroup"], ports, response="\n")
|
||||
|
||||
# Set anon as anonymous group
|
||||
ctrl_dynsec_cmd(["setAnonymousGroup", "group2"], ports)
|
||||
|
||||
# Verify
|
||||
ctrl_dynsec_cmd(["getAnonymousGroup"], ports, response="group2\n")
|
||||
|
||||
# Create roles
|
||||
ctrl_dynsec_cmd(["createRole", "role1"], ports)
|
||||
ctrl_dynsec_cmd(["createRole", "role2"], ports)
|
||||
ctrl_dynsec_cmd(["createRole", "role3"], ports)
|
||||
#Delete a role: deleteRole <rolename>
|
||||
|
||||
ctrl_dynsec_cmd(["deleteRole", "role3"], ports) # repeat with count, offset
|
||||
|
||||
# Add a role to a client
|
||||
ctrl_dynsec_cmd(["addClientRole", "username1", "role1", "20"], ports)
|
||||
|
||||
# Add a role to a group
|
||||
ctrl_dynsec_cmd(["addGroupRole", "group1", "role2", "15"], ports)
|
||||
|
||||
ctrl_dynsec_cmd(["getGroup", "group1"], ports) # repeat with count, offset
|
||||
|
||||
# Add ACLs
|
||||
ctrl_dynsec_cmd(["addRoleACL", "role1", "publishClientSend", "#", "allow", "1"], ports)
|
||||
ctrl_dynsec_cmd(["addRoleACL", "role1", "publishClientReceive", "#", "allow", "2"], ports)
|
||||
ctrl_dynsec_cmd(["addRoleACL", "role1", "subscribeLiteral", "#", "allow", "1"], ports)
|
||||
ctrl_dynsec_cmd(["addRoleACL", "role1", "subscribePattern", "#", "allow", "2"], ports)
|
||||
ctrl_dynsec_cmd(["addRoleACL", "role1", "unsubscribeLiteral", "#", "deny", "1"], ports)
|
||||
ctrl_dynsec_cmd(["addRoleACL", "role1", "unsubscribePattern", "#", "deny", "2"], ports)
|
||||
ctrl_dynsec_cmd(["addRoleACL", "role2", "publishClientSend", "#", "allow", "3"], ports)
|
||||
ctrl_dynsec_cmd(["addRoleACL", "role2", "publishClientReceive", "#", "allow"], ports)
|
||||
|
||||
# List roles
|
||||
ctrl_dynsec_cmd(["listRoles"], ports, response="admin\nrole1\nrole2\n")
|
||||
ctrl_dynsec_cmd(["listRoles", "1"], ports, response="admin\n")
|
||||
ctrl_dynsec_cmd(["listRoles", "1", "1"], ports, response="role1\n")
|
||||
|
||||
# Get role
|
||||
ctrl_dynsec_cmd(["getRole", "role1"], ports, response="Rolename: role1\nACLs: publishClientSend : allow : # (priority: 1)\n publishClientReceive : allow : # (priority: 2)\n subscribeLiteral : allow : # (priority: 1)\n subscribePattern : allow : # (priority: 2)\n unsubscribeLiteral : deny : # (priority: 1)\n unsubscribePattern : deny : # (priority: 2)\n")
|
||||
|
||||
# Get client
|
||||
ctrl_dynsec_cmd(["getClient", "username1"], ports, response="Username: username1\nClientid:\nRoles: role1 (priority: 20)\nGroups: group1 (priority: 10)\n")
|
||||
|
||||
# Disable client
|
||||
ctrl_dynsec_cmd(["disableClient", "username1"], ports)
|
||||
|
||||
# Verify client
|
||||
ctrl_dynsec_cmd(["getClient", "username1"], ports, response="Username: username1\nClientid:\nDisabled: true\nRoles: role1 (priority: 20)\nGroups: group1 (priority: 10)\n")
|
||||
|
||||
# Set clientid
|
||||
ctrl_dynsec_cmd(["setClientID", "username1", "fixed-id"], ports)
|
||||
|
||||
# Verify client
|
||||
ctrl_dynsec_cmd(["getClient", "username1"], ports, response="Username: username1\nClientid: fixed-id\nDisabled: true\nRoles: role1 (priority: 20)\nGroups: group1 (priority: 10)\n")
|
||||
|
||||
# Clear clientid
|
||||
ctrl_dynsec_cmd(["setClientID", "username1"], ports)
|
||||
|
||||
# Enable client
|
||||
ctrl_dynsec_cmd(["enableClient", "username1"], ports)
|
||||
|
||||
# Verify client
|
||||
ctrl_dynsec_cmd(["getClient", "username1"], ports, response="Username: username1\nClientid:\nRoles: role1 (priority: 20)\nGroups: group1 (priority: 10)\n")
|
||||
|
||||
# Set client password
|
||||
ctrl_dynsec_cmd(["setClientPassword", "username1", "new-password"], ports)
|
||||
ctrl_dynsec_cmd(["setClientPassword", "username1"], ports, input="not\nmatch\n")
|
||||
|
||||
# Remove an ACL
|
||||
ctrl_dynsec_cmd(["removeRoleACL", "role1", "publishClientReceive", "#"], ports)
|
||||
ctrl_dynsec_cmd(["getRole", "role1"], ports, response="Rolename: role1\nACLs: publishClientSend : allow : # (priority: 1)\n subscribeLiteral : allow : # (priority: 1)\n subscribePattern : allow : # (priority: 2)\n unsubscribeLiteral : deny : # (priority: 1)\n unsubscribePattern : deny : # (priority: 2)\n")
|
||||
|
||||
ctrl_dynsec_cmd(["removeRoleACL", "role1", "publishClientSend", "#"], ports)
|
||||
ctrl_dynsec_cmd(["getRole", "role1"], ports, response="Rolename: role1\nACLs: subscribeLiteral : allow : # (priority: 1)\n subscribePattern : allow : # (priority: 2)\n unsubscribeLiteral : deny : # (priority: 1)\n unsubscribePattern : deny : # (priority: 2)\n")
|
||||
|
||||
ctrl_dynsec_cmd(["removeRoleACL", "role1", "subscribeLiteral", "#"], ports)
|
||||
ctrl_dynsec_cmd(["removeRoleACL", "role1", "subscribePattern", "#"], ports)
|
||||
ctrl_dynsec_cmd(["removeRoleACL", "role1", "unsubscribeLiteral", "#"], ports)
|
||||
ctrl_dynsec_cmd(["removeRoleACL", "role1", "unsubscribePattern", "#"], ports)
|
||||
ctrl_dynsec_cmd(["getRole", "role1"], ports, response="Rolename: role1\n")
|
||||
|
||||
# Remove a Role
|
||||
ctrl_dynsec_cmd(["deleteRole", "role2"], ports)
|
||||
|
||||
# Remove client from a group
|
||||
ctrl_dynsec_cmd(["removeGroupClient", "group1", "username1"], ports)
|
||||
|
||||
# Remove role from a group
|
||||
ctrl_dynsec_cmd(["removeGroupRole", "group1", "role2"], ports)
|
||||
|
||||
# Delete group
|
||||
ctrl_dynsec_cmd(["deleteGroup", "group1"], ports)
|
||||
|
||||
ctrl_dynsec_cmd(["removeClientRole", "username1", "role1"], ports)
|
||||
|
||||
# Delete client
|
||||
ctrl_dynsec_cmd(["deleteClient", "username1"], ports)
|
||||
|
||||
rc = 0
|
||||
except mosq_test.TestError:
|
||||
pass
|
||||
finally:
|
||||
os.remove(conf_file)
|
||||
try:
|
||||
os.remove(f"{ports[0]}/dynamic-security.json")
|
||||
pass
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
shutil.rmtree(f"{ports[0]}")
|
||||
broker.terminate()
|
||||
if mosq_test.wait_for_subprocess(broker):
|
||||
print("broker not terminated")
|
||||
if rc == 0: rc=1
|
||||
|
||||
exit(rc)
|
@ -0,0 +1,5 @@
|
||||
# mosquitto_ctrl
|
||||
|
||||
The tests in 02-ctrl-dynsec.py test the operation of the mosquitto_ctrl dynsec
|
||||
module by using the output of mosquitto_ctrl. The tests should be extended by
|
||||
connecting clients to ensure that auth and ACLs are being set correctly.
|
Loading…
Reference in New Issue