You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1220 lines
41 KiB
Python
1220 lines
41 KiB
Python
import atexit
|
|
import base64
|
|
import errno
|
|
import hashlib
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
import struct
|
|
import sys
|
|
import time
|
|
import uuid
|
|
|
|
import traceback
|
|
|
|
import mqtt5_props
|
|
|
|
import __main__
|
|
|
|
from pathlib import Path
|
|
vg_index = 1
|
|
vg_logfiles = []
|
|
|
|
class TestError(Exception):
|
|
def __init__(self, message="Mismatched packets"):
|
|
self.message = message
|
|
|
|
def get_build_root():
|
|
result = os.getenv("BUILD_ROOT")
|
|
if result is None:
|
|
result = str(Path(__file__).resolve().parents[1])
|
|
return result
|
|
|
|
def listen_sock(port):
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.settimeout(10)
|
|
sock.bind(('', port))
|
|
sock.listen(5)
|
|
return sock
|
|
|
|
def start_broker(filename, cmd=None, port=0, use_conf=False, expect_fail=False, nolog=False, checkhost="localhost", env=None, check_port=True):
|
|
global vg_index
|
|
global vg_logfiles
|
|
|
|
delay = 0.1
|
|
|
|
if use_conf:
|
|
cmd = [get_build_root() + '/src/mosquitto', '-v', '-c', filename.replace('.py', '.conf')]
|
|
|
|
if port == 0:
|
|
port = 1888
|
|
else:
|
|
if cmd is None and port != 0:
|
|
cmd = [get_build_root() + '/src/mosquitto', '-v', '-p', str(port)]
|
|
elif cmd is None and port == 0:
|
|
port = 1888
|
|
cmd = [get_build_root() + '/src/mosquitto', '-v', '-c', filename.replace('.py', '.conf')]
|
|
elif cmd is not None and port == 0:
|
|
port = 1888
|
|
|
|
if os.environ.get('MOSQ_USE_VALGRIND') is not None:
|
|
logfile = filename+'.'+str(vg_index)+'.vglog'
|
|
if os.environ.get('MOSQ_USE_VALGRIND') == 'callgrind':
|
|
cmd = ['valgrind', '-q', '--tool=callgrind', '--log-file='+logfile] + cmd
|
|
elif os.environ.get('MOSQ_USE_VALGRIND') == 'massif':
|
|
cmd = ['valgrind', '-q', '--tool=massif', '--log-file='+logfile] + cmd
|
|
elif os.environ.get('MOSQ_USE_VALGRIND') == 'failgrind':
|
|
cmd = ['fg-helper'] + cmd
|
|
else:
|
|
cmd = ['valgrind', '-q', '--track-fds=yes', '--trace-children=yes', '--leak-check=full', '--show-leak-kinds=all', '--log-file='+logfile] + cmd
|
|
vg_logfiles.append(logfile)
|
|
vg_index += 1
|
|
delay = 1
|
|
|
|
#print(port)
|
|
#print(cmd)
|
|
if nolog == False:
|
|
broker = subprocess.Popen(cmd, stderr=subprocess.PIPE, env=env)
|
|
else:
|
|
broker = subprocess.Popen(cmd, stderr=subprocess.DEVNULL, env=env)
|
|
|
|
if check_port == False:
|
|
return broker
|
|
|
|
for i in range(0, 20):
|
|
time.sleep(delay)
|
|
c = None
|
|
try:
|
|
c = socket.create_connection((checkhost, port))
|
|
except socket.error as err:
|
|
if err.errno != errno.ECONNREFUSED:
|
|
raise
|
|
|
|
if c is not None:
|
|
c.close()
|
|
return broker
|
|
|
|
if expect_fail == False:
|
|
outs, errs = broker.communicate(timeout=1)
|
|
print("FAIL: unable to start broker: %s" % errs)
|
|
raise IOError
|
|
else:
|
|
return None
|
|
|
|
def start_client(filename, cmd, env=None):
|
|
if cmd is None:
|
|
raise ValueError
|
|
if env is None:
|
|
env = dict(os.environ)
|
|
env['LD_LIBRARY_PATH'] = get_build_root() + '/lib:' + get_build_root() + '/lib/cpp'
|
|
if os.environ.get('MOSQ_USE_VALGRIND') is not None:
|
|
cmd = ['valgrind', '-q', '--log-file='+filename+'.vglog'] + cmd
|
|
|
|
return subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
|
|
def wait_for_subprocess(client,timeout=10,terminate_timeout=2):
|
|
rc=0
|
|
try:
|
|
client.wait(timeout)
|
|
except subprocess.TimeoutExpired:
|
|
rc=1
|
|
client.terminate()
|
|
try:
|
|
client.wait(terminate_timeout)
|
|
except subprocess.TimeoutExpired:
|
|
rc=2
|
|
client.kill()
|
|
try:
|
|
client.wait(terminate_timeout)
|
|
except subprocess.TimeoutExpired:
|
|
rc=3
|
|
pass
|
|
return rc
|
|
|
|
|
|
def terminate_broker(broker):
|
|
broker.terminate()
|
|
(_, stde) = broker.communicate()
|
|
if wait_for_subprocess(broker):
|
|
print("broker not terminated")
|
|
return (1, stde)
|
|
else:
|
|
return (0, stde)
|
|
|
|
|
|
def pub_helper(port, proto_ver=4):
|
|
connect_packet = gen_connect("pub-helper", proto_ver=proto_ver)
|
|
connack_packet = gen_connack(rc=0, proto_ver=proto_ver)
|
|
|
|
sock = do_client_connect(connect_packet, connack_packet, port=port, connack_error="pub helper connack")
|
|
return sock
|
|
|
|
|
|
def sub_helper(port, topic='#', qos=0, proto_ver=4):
|
|
connect_packet = gen_connect("sub-helper", proto_ver=proto_ver)
|
|
connack_packet = gen_connack(rc=0, proto_ver=proto_ver)
|
|
|
|
mid = 1
|
|
subscribe_packet = gen_subscribe(mid=mid, topic=topic, qos=qos, proto_ver=proto_ver)
|
|
suback_packet = gen_suback(mid=mid, qos=qos, proto_ver=proto_ver)
|
|
sock = do_client_connect(connect_packet, connack_packet, port=port)
|
|
do_send_receive(sock, subscribe_packet, suback_packet, "sub helper suback")
|
|
return sock
|
|
|
|
|
|
def expect_packet(sock, name, expected):
|
|
if len(expected) > 0:
|
|
rlen = len(expected)
|
|
else:
|
|
rlen = 1
|
|
|
|
packet_recvd = b""
|
|
try:
|
|
while len(packet_recvd) < rlen:
|
|
data = sock.recv(rlen-len(packet_recvd))
|
|
if len(data) == 0:
|
|
raise BrokenPipeError(f"when reading {name} from {sock.getpeername()}")
|
|
packet_recvd += data
|
|
except socket.timeout:
|
|
pass
|
|
|
|
if packet_matches(name, packet_recvd, expected):
|
|
return True
|
|
else:
|
|
raise TestError
|
|
|
|
|
|
def packet_matches(name, recvd, expected):
|
|
if recvd != expected:
|
|
print("FAIL: Received incorrect "+name+".")
|
|
try:
|
|
print("Received: "+to_string(recvd))
|
|
except struct.error:
|
|
print("Received (not decoded, len=%d): %s" % (len(recvd), recvd))
|
|
try:
|
|
print("Expected: "+to_string(expected))
|
|
except struct.error:
|
|
print("Expected (not decoded, len=%d): %s" % (len(expected), expected))
|
|
traceback.print_stack(file=sys.stdout)
|
|
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def receive_unordered(sock, recv1_packet, recv2_packet, error_string):
|
|
expected1 = recv1_packet + recv2_packet
|
|
expected2 = recv2_packet + recv1_packet
|
|
recvd = b''
|
|
while len(recvd) < len(expected1):
|
|
r = sock.recv(1)
|
|
if len(r) == 0:
|
|
raise ValueError(error_string)
|
|
recvd += r
|
|
|
|
if recvd == expected1 or recvd == expected2:
|
|
return
|
|
else:
|
|
packet_matches(error_string, recvd, expected2)
|
|
raise ValueError(error_string)
|
|
|
|
|
|
def do_send_receive(sock, send_packet, receive_packet, error_string="send receive error"):
|
|
size = len(send_packet)
|
|
total_sent = 0
|
|
while total_sent < size:
|
|
sent = sock.send(send_packet[total_sent:])
|
|
if sent == 0:
|
|
raise RuntimeError("socket connection broken")
|
|
total_sent += sent
|
|
|
|
if expect_packet(sock, error_string, receive_packet):
|
|
return sock
|
|
else:
|
|
sock.close()
|
|
raise ValueError
|
|
|
|
|
|
# Useful for mocking a client receiving (with ack) a qos1 publish
|
|
def do_receive_send(sock, receive_packet, send_packet, error_string="receive send error"):
|
|
if expect_packet(sock, error_string, receive_packet):
|
|
size = len(send_packet)
|
|
total_sent = 0
|
|
while total_sent < size:
|
|
sent = sock.send(send_packet[total_sent:])
|
|
if sent == 0:
|
|
raise RuntimeError("socket connection broken")
|
|
total_sent += sent
|
|
return sock
|
|
else:
|
|
sock.close()
|
|
raise ValueError
|
|
|
|
|
|
def client_connect_only(hostname="localhost", port=1888, timeout=10, protocol="mqtt"):
|
|
if protocol == "websockets":
|
|
addr = (hostname, port)
|
|
sock = socket.create_connection(addr, timeout=timeout)
|
|
sock.settimeout(timeout)
|
|
sock = WebsocketWrapper(sock, hostname, port, False, "/mqtt", None)
|
|
#sock.setblocking(0)
|
|
else:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(timeout)
|
|
sock.connect((hostname, port))
|
|
return sock
|
|
|
|
def client_connect_only_unix(path, timeout=10):
|
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
sock.settimeout(timeout)
|
|
sock.connect(path)
|
|
return sock
|
|
|
|
def do_client_connect(connect_packet, connack_packet, hostname="localhost", port=1888, timeout=10, connack_error="connack", protocol="mqtt"):
|
|
sock = client_connect_only(hostname, port, timeout, protocol)
|
|
|
|
return do_send_receive(sock, connect_packet, connack_packet, connack_error)
|
|
|
|
|
|
def do_client_connect_unix(connect_packet, connack_packet, path, timeout=10, connack_error="connack"):
|
|
sock = client_connect_only_unix(path, timeout)
|
|
|
|
return do_send_receive(sock, connect_packet, connack_packet, connack_error)
|
|
|
|
|
|
def remaining_length(packet):
|
|
l = min(5, len(packet))
|
|
all_bytes = struct.unpack("!"+"B"*l, packet[:l])
|
|
mult = 1
|
|
rl = 0
|
|
for i in range(1,l-1):
|
|
byte = all_bytes[i]
|
|
|
|
rl += (byte & 127) * mult
|
|
mult *= 128
|
|
if byte & 128 == 0:
|
|
packet = packet[i+1:]
|
|
break
|
|
|
|
return (packet, rl)
|
|
|
|
|
|
def to_hex_string(packet):
|
|
if len(packet) == 0:
|
|
return ""
|
|
|
|
s = ""
|
|
while len(packet) > 0:
|
|
packet0 = struct.unpack("!B", packet[0])
|
|
s = s+hex(packet0[0]) + " "
|
|
packet = packet[1:]
|
|
|
|
return s
|
|
|
|
|
|
def to_string(packet):
|
|
if len(packet) == 0:
|
|
return ""
|
|
|
|
packet0 = struct.unpack("!B%ds" % (len(packet)-1), bytes(packet))
|
|
packet0 = packet0[0]
|
|
cmd = packet0 & 0xF0
|
|
if cmd == 0x00:
|
|
# Reserved
|
|
return "0x00"
|
|
elif cmd == 0x10:
|
|
# CONNECT
|
|
(packet, rl) = remaining_length(packet)
|
|
pack_format = "!H" + str(len(packet)-2) + 's'
|
|
(slen, packet) = struct.unpack(pack_format, packet)
|
|
pack_format = "!" + str(slen)+'sBBH' + str(len(packet)-slen-4) + 's'
|
|
(protocol, proto_ver, flags, keepalive, packet) = struct.unpack(pack_format, packet)
|
|
s = "CONNECT, proto="+str(protocol)+str(proto_ver)+", keepalive="+str(keepalive)
|
|
if flags&2:
|
|
s = s+", clean-session"
|
|
else:
|
|
s = s+", durable"
|
|
|
|
pack_format = "!H" + str(len(packet)-2) + 's'
|
|
(slen, packet) = struct.unpack(pack_format, packet)
|
|
pack_format = "!" + str(slen)+'s' + str(len(packet)-slen) + 's'
|
|
(client_id, packet) = struct.unpack(pack_format, packet)
|
|
s = s+", id="+str(client_id)
|
|
|
|
if flags&4:
|
|
pack_format = "!H" + str(len(packet)-2) + 's'
|
|
(slen, packet) = struct.unpack(pack_format, packet)
|
|
pack_format = "!" + str(slen)+'s' + str(len(packet)-slen) + 's'
|
|
(will_topic, packet) = struct.unpack(pack_format, packet)
|
|
s = s+", will-topic="+str(will_topic)
|
|
|
|
pack_format = "!H" + str(len(packet)-2) + 's'
|
|
(slen, packet) = struct.unpack(pack_format, packet)
|
|
pack_format = "!" + str(slen)+'s' + str(len(packet)-slen) + 's'
|
|
(will_message, packet) = struct.unpack(pack_format, packet)
|
|
s = s+", will-message="+will_message
|
|
|
|
s = s+", will-qos="+str((flags&24)>>3)
|
|
s = s+", will-retain="+str((flags&32)>>5)
|
|
|
|
if flags&128:
|
|
pack_format = "!H" + str(len(packet)-2) + 's'
|
|
(slen, packet) = struct.unpack(pack_format, packet)
|
|
pack_format = "!" + str(slen)+'s' + str(len(packet)-slen) + 's'
|
|
(username, packet) = struct.unpack(pack_format, packet)
|
|
s = s+", username="+str(username)
|
|
|
|
if flags&64:
|
|
pack_format = "!H" + str(len(packet)-2) + 's'
|
|
(slen, packet) = struct.unpack(pack_format, packet)
|
|
pack_format = "!" + str(slen)+'s' + str(len(packet)-slen) + 's'
|
|
(password, packet) = struct.unpack(pack_format, packet)
|
|
s = s+", password="+str(password)
|
|
|
|
if flags&1:
|
|
s = s+", reserved=1"
|
|
|
|
return s
|
|
elif cmd == 0x20:
|
|
# CONNACK
|
|
if len(packet) == 4:
|
|
(cmd, rl, resv, rc) = struct.unpack('!BBBB', packet)
|
|
return "CONNACK, rl="+str(rl)+", res="+str(resv)+", rc="+str(rc)
|
|
elif len(packet) == 5:
|
|
(cmd, rl, flags, reason_code, proplen) = struct.unpack('!BBBBB', packet)
|
|
return "CONNACK, rl="+str(rl)+", flags="+str(flags)+", rc="+str(reason_code)+", proplen="+str(proplen)
|
|
else:
|
|
return "CONNACK, (not decoded)"
|
|
|
|
elif cmd == 0x30:
|
|
# PUBLISH
|
|
dup = (packet0 & 0x08)>>3
|
|
qos = (packet0 & 0x06)>>1
|
|
retain = (packet0 & 0x01)
|
|
(packet, rl) = remaining_length(packet)
|
|
pack_format = "!H" + str(len(packet)-2) + 's'
|
|
(tlen, packet) = struct.unpack(pack_format, packet)
|
|
pack_format = "!" + str(tlen)+'s' + str(len(packet)-tlen) + 's'
|
|
(topic, packet) = struct.unpack(pack_format, packet)
|
|
s = "PUBLISH, rl="+str(rl)+", topic="+str(topic)+", qos="+str(qos)+", retain="+str(retain)+", dup="+str(dup)
|
|
if qos > 0:
|
|
pack_format = "!H" + str(len(packet)-2) + 's'
|
|
(mid, packet) = struct.unpack(pack_format, packet)
|
|
s = s + ", mid="+str(mid)
|
|
|
|
s = s + ", payload="+str(packet)
|
|
return s
|
|
elif cmd == 0x40:
|
|
# PUBACK
|
|
if len(packet) == 5:
|
|
(cmd, rl, mid, reason_code) = struct.unpack('!BBHB', packet)
|
|
return "PUBACK, rl="+str(rl)+", mid="+str(mid)+", reason_code="+str(reason_code)
|
|
else:
|
|
(cmd, rl, mid) = struct.unpack('!BBH', packet)
|
|
return "PUBACK, rl="+str(rl)+", mid="+str(mid)
|
|
elif cmd == 0x50:
|
|
# PUBREC
|
|
if len(packet) == 5:
|
|
(cmd, rl, mid, reason_code) = struct.unpack('!BBHB', packet)
|
|
return "PUBREC, rl="+str(rl)+", mid="+str(mid)+", reason_code="+str(reason_code)
|
|
else:
|
|
(cmd, rl, mid) = struct.unpack('!BBH', packet)
|
|
return "PUBREC, rl="+str(rl)+", mid="+str(mid)
|
|
elif cmd == 0x60:
|
|
# PUBREL
|
|
dup = (packet0 & 0x08)>>3
|
|
(cmd, rl, mid) = struct.unpack('!BBH', packet)
|
|
return "PUBREL, rl="+str(rl)+", mid="+str(mid)+", dup="+str(dup)
|
|
elif cmd == 0x70:
|
|
# PUBCOMP
|
|
(cmd, rl, mid) = struct.unpack('!BBH', packet)
|
|
return "PUBCOMP, rl="+str(rl)+", mid="+str(mid)
|
|
elif cmd == 0x80:
|
|
# SUBSCRIBE
|
|
(packet, rl) = remaining_length(packet)
|
|
pack_format = "!H" + str(len(packet)-2) + 's'
|
|
(mid, packet) = struct.unpack(pack_format, packet)
|
|
s = "SUBSCRIBE, rl="+str(rl)+", mid="+str(mid)
|
|
topic_index = 0
|
|
while len(packet) > 0:
|
|
pack_format = "!H" + str(len(packet)-2) + 's'
|
|
(tlen, packet) = struct.unpack(pack_format, packet)
|
|
pack_format = "!" + str(tlen)+'sB' + str(len(packet)-tlen-1) + 's'
|
|
(topic, qos, packet) = struct.unpack(pack_format, packet)
|
|
s = s + ", topic"+str(topic_index)+"="+str(topic)+","+str(qos)
|
|
return s
|
|
elif cmd == 0x90:
|
|
# SUBACK
|
|
(packet, rl) = remaining_length(packet)
|
|
pack_format = "!H" + str(len(packet)-2) + 's'
|
|
(mid, packet) = struct.unpack(pack_format, packet)
|
|
pack_format = "!" + "B"*len(packet)
|
|
granted_qos = struct.unpack(pack_format, packet)
|
|
|
|
s = "SUBACK, rl="+str(rl)+", mid="+str(mid)+", granted_qos="+str(granted_qos[0])
|
|
for i in range(1, len(granted_qos)-1):
|
|
s = s+", "+str(granted_qos[i])
|
|
return s
|
|
elif cmd == 0xA0:
|
|
# UNSUBSCRIBE
|
|
(packet, rl) = remaining_length(packet)
|
|
pack_format = "!H" + str(len(packet)-2) + 's'
|
|
(mid, packet) = struct.unpack(pack_format, packet)
|
|
s = "UNSUBSCRIBE, rl="+str(rl)+", mid="+str(mid)
|
|
topic_index = 0
|
|
while len(packet) > 0:
|
|
pack_format = "!H" + str(len(packet)-2) + 's'
|
|
(tlen, packet) = struct.unpack(pack_format, packet)
|
|
pack_format = "!" + str(tlen)+'s' + str(len(packet)-tlen) + 's'
|
|
(topic, packet) = struct.unpack(pack_format, packet)
|
|
s = s + ", topic"+str(topic_index)+"="+str(topic)
|
|
return s
|
|
elif cmd == 0xB0:
|
|
# UNSUBACK
|
|
(cmd, rl, mid) = struct.unpack('!BBH', packet)
|
|
return "UNSUBACK, rl="+str(rl)+", mid="+str(mid)
|
|
elif cmd == 0xC0:
|
|
# PINGREQ
|
|
(cmd, rl) = struct.unpack('!BB', packet)
|
|
return "PINGREQ, rl="+str(rl)
|
|
elif cmd == 0xD0:
|
|
# PINGRESP
|
|
(cmd, rl) = struct.unpack('!BB', packet)
|
|
return "PINGRESP, rl="+str(rl)
|
|
elif cmd == 0xE0:
|
|
# DISCONNECT
|
|
if len(packet) == 3:
|
|
(cmd, rl, reason_code) = struct.unpack('!BBB', packet)
|
|
return "DISCONNECT, rl="+str(rl)+", reason_code="+str(reason_code)
|
|
else:
|
|
(cmd, rl) = struct.unpack('!BB', packet)
|
|
return "DISCONNECT, rl="+str(rl)
|
|
elif cmd == 0xF0:
|
|
# AUTH
|
|
(cmd, rl) = struct.unpack('!BB', packet)
|
|
return "AUTH, rl="+str(rl)
|
|
|
|
|
|
def read_varint(sock, rl):
|
|
varint = 0
|
|
multiplier = 1
|
|
while True:
|
|
byte = sock.recv(1)
|
|
byte, = struct.unpack("!B", byte)
|
|
varint += (byte & 127)*multiplier
|
|
multiplier *= 128
|
|
rl -= 1
|
|
if byte & 128 == 0x00:
|
|
return (varint, rl)
|
|
|
|
|
|
def mqtt_read_string(sock, rl):
|
|
slen = sock.recv(2)
|
|
slen, = struct.unpack("!H", slen)
|
|
payload = sock.recv(slen)
|
|
payload, = struct.unpack("!%ds" % (slen), payload)
|
|
rl -= (2 + slen)
|
|
return (payload, rl)
|
|
|
|
|
|
def read_publish(sock, proto_ver=4):
|
|
cmd, = struct.unpack("!B", sock.recv(1))
|
|
if cmd & 0xF0 != 0x30:
|
|
raise ValueError
|
|
|
|
qos = (cmd & 0x06) >> 1
|
|
rl, t = read_varint(sock, 0)
|
|
topic, rl = mqtt_read_string(sock, rl)
|
|
|
|
if qos > 0:
|
|
sock.recv(2)
|
|
rl -= 1
|
|
|
|
if proto_ver == 5:
|
|
proplen, rl = read_varint(sock, rl)
|
|
sock.recv(proplen)
|
|
rl -= proplen
|
|
|
|
payload = sock.recv(rl).decode('utf-8')
|
|
return payload
|
|
|
|
|
|
def gen_fixed_hdr(command, remaining_length):
|
|
return struct.pack("B", command) + pack_remaining_length(remaining_length)
|
|
|
|
def gen_variable_hdr(mid=None):
|
|
if mid is not None:
|
|
return struct.pack("!H", mid)
|
|
else:
|
|
return b""
|
|
|
|
|
|
def gen_connect(client_id, clean_session=True, keepalive=60, username=None, password=None, will_topic=None, will_qos=0, will_retain=False, will_payload=b"", proto_ver=4, connect_reserved=False, properties=b"", will_properties=b"", session_expiry=-1):
|
|
if (proto_ver&0x7F) == 3 or proto_ver == 0:
|
|
remaining_length = 12
|
|
elif (proto_ver&0x7F) == 4 or proto_ver == 5:
|
|
remaining_length = 10
|
|
else:
|
|
raise ValueError
|
|
|
|
if client_id != None:
|
|
client_id = client_id.encode("utf-8")
|
|
remaining_length = remaining_length + 2+len(client_id)
|
|
else:
|
|
remaining_length = remaining_length + 2
|
|
|
|
connect_flags = 0
|
|
|
|
if connect_reserved:
|
|
connect_flags = connect_flags | 0x01
|
|
|
|
if clean_session:
|
|
connect_flags = connect_flags | 0x02
|
|
|
|
if proto_ver == 5:
|
|
if properties == b"":
|
|
properties += mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_RECEIVE_MAXIMUM, 20)
|
|
|
|
if session_expiry != -1:
|
|
properties += mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, session_expiry)
|
|
|
|
properties = mqtt5_props.prop_finalise(properties)
|
|
remaining_length += len(properties)
|
|
|
|
if will_topic != None:
|
|
will_topic = will_topic.encode("utf-8")
|
|
remaining_length = remaining_length + 2+len(will_topic) + 2+len(will_payload)
|
|
connect_flags = connect_flags | 0x04 | ((will_qos&0x03) << 3)
|
|
if will_retain:
|
|
connect_flags = connect_flags | 32
|
|
if proto_ver == 5:
|
|
will_properties = mqtt5_props.prop_finalise(will_properties)
|
|
remaining_length += len(will_properties)
|
|
|
|
if username != None:
|
|
username = username.encode("utf-8")
|
|
remaining_length = remaining_length + 2+len(username)
|
|
connect_flags = connect_flags | 0x80
|
|
if password != None:
|
|
password = password.encode("utf-8")
|
|
connect_flags = connect_flags | 0x40
|
|
remaining_length = remaining_length + 2+len(password)
|
|
|
|
rl = pack_remaining_length(remaining_length)
|
|
packet = struct.pack("!B"+str(len(rl))+"s", 0x10, rl)
|
|
if (proto_ver&0x7F) == 3 or proto_ver == 0:
|
|
packet = packet + struct.pack("!H6sBBH", len(b"MQIsdp"), b"MQIsdp", proto_ver, connect_flags, keepalive)
|
|
elif (proto_ver&0x7F) == 4 or proto_ver == 5:
|
|
packet = packet + struct.pack("!H4sBBH", len(b"MQTT"), b"MQTT", proto_ver, connect_flags, keepalive)
|
|
|
|
if proto_ver == 5:
|
|
packet += properties
|
|
|
|
if client_id != None:
|
|
packet = packet + struct.pack("!H"+str(len(client_id))+"s", len(client_id), bytes(client_id))
|
|
else:
|
|
packet = packet + struct.pack("!H", 0)
|
|
|
|
if will_topic != None:
|
|
packet += will_properties
|
|
packet = packet + struct.pack("!H"+str(len(will_topic))+"s", len(will_topic), will_topic)
|
|
if len(will_payload) > 0:
|
|
packet = packet + struct.pack("!H"+str(len(will_payload))+"s", len(will_payload), will_payload)
|
|
else:
|
|
packet = packet + struct.pack("!H", 0)
|
|
|
|
if username != None:
|
|
packet = packet + struct.pack("!H"+str(len(username))+"s", len(username), username)
|
|
if password != None:
|
|
packet = packet + struct.pack("!H"+str(len(password))+"s", len(password), password)
|
|
return packet
|
|
|
|
def gen_connack(flags=0, rc=0, proto_ver=4, properties=b"", property_helper=True):
|
|
if proto_ver == 5:
|
|
if property_helper == True:
|
|
if properties is not None:
|
|
properties = mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_TOPIC_ALIAS_MAXIMUM, 10) \
|
|
+ properties + mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_RECEIVE_MAXIMUM, 20)
|
|
else:
|
|
properties = b""
|
|
properties = mqtt5_props.prop_finalise(properties)
|
|
|
|
packet = struct.pack('!BBBB', 32, 2+len(properties), flags, rc) + properties
|
|
else:
|
|
packet = struct.pack('!BBBB', 32, 2, flags, rc);
|
|
|
|
return packet
|
|
|
|
def gen_publish(topic, qos, payload=None, retain=False, dup=False, mid=0, proto_ver=4, properties=b""):
|
|
topic = topic.encode("utf-8")
|
|
rl = 2+len(topic)
|
|
pack_format = "H"+str(len(topic))+"s"
|
|
if qos > 0:
|
|
rl = rl + 2
|
|
pack_format = pack_format + "H"
|
|
|
|
if proto_ver == 5:
|
|
properties = mqtt5_props.prop_finalise(properties)
|
|
rl += len(properties)
|
|
# This will break if len(properties) > 127
|
|
pack_format = pack_format + "%ds"%(len(properties))
|
|
|
|
if payload != None:
|
|
if isinstance(payload, bytes) == False:
|
|
payload = payload.encode("utf-8")
|
|
rl = rl + len(payload)
|
|
pack_format = pack_format + str(len(payload))+"s"
|
|
else:
|
|
payload = b""
|
|
pack_format = pack_format + "0s"
|
|
|
|
rlpacked = pack_remaining_length(rl)
|
|
cmd = 48 | (qos<<1)
|
|
if retain:
|
|
cmd = cmd + 1
|
|
if dup:
|
|
cmd = cmd + 8
|
|
|
|
if proto_ver == 5:
|
|
if qos > 0:
|
|
return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, mid, properties, payload)
|
|
else:
|
|
return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, properties, payload)
|
|
else:
|
|
if qos > 0:
|
|
return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, mid, payload)
|
|
else:
|
|
return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, payload)
|
|
|
|
def _gen_command_with_mid(cmd, mid, proto_ver=4, reason_code=-1, properties=None):
|
|
if proto_ver == 5 and (reason_code != -1 or properties is not None):
|
|
if reason_code == -1:
|
|
reason_code = 0
|
|
|
|
if properties is None:
|
|
return struct.pack('!BBHB', cmd, 3, mid, reason_code)
|
|
elif properties == "":
|
|
return struct.pack('!BBHBB', cmd, 4, mid, reason_code, 0)
|
|
else:
|
|
properties = mqtt5_props.prop_finalise(properties)
|
|
pack_format = "!BBHB"+str(len(properties))+"s"
|
|
return struct.pack(pack_format, cmd, 2+1+len(properties), mid, reason_code, properties)
|
|
else:
|
|
return struct.pack('!BBH', cmd, 2, mid)
|
|
|
|
def gen_puback(mid, proto_ver=4, reason_code=-1, properties=None):
|
|
return _gen_command_with_mid(64, mid, proto_ver, reason_code, properties)
|
|
|
|
def gen_pubrec(mid, proto_ver=4, reason_code=-1, properties=None):
|
|
return _gen_command_with_mid(80, mid, proto_ver, reason_code, properties)
|
|
|
|
def gen_pubrel(mid, dup=False, proto_ver=4, reason_code=-1, properties=None):
|
|
if dup:
|
|
cmd = 96+8+2
|
|
else:
|
|
cmd = 96+2
|
|
return _gen_command_with_mid(cmd, mid, proto_ver, reason_code, properties)
|
|
|
|
def gen_pubcomp(mid, proto_ver=4, reason_code=-1, properties=None):
|
|
return _gen_command_with_mid(112, mid, proto_ver, reason_code, properties)
|
|
|
|
|
|
def gen_subscribe(mid, topic, qos, cmd=130, proto_ver=4, properties=b""):
|
|
topic = topic.encode("utf-8")
|
|
packet = struct.pack("!B", cmd)
|
|
if proto_ver == 5:
|
|
if properties == b"":
|
|
packet += pack_remaining_length(2+1+2+len(topic)+1)
|
|
pack_format = "!HBH"+str(len(topic))+"sB"
|
|
return packet + struct.pack(pack_format, mid, 0, len(topic), topic, qos)
|
|
else:
|
|
properties = mqtt5_props.prop_finalise(properties)
|
|
packet += pack_remaining_length(2+1+2+len(topic)+len(properties))
|
|
pack_format = "!H"+str(len(properties))+"s"+"H"+str(len(topic))+"sB"
|
|
return packet + struct.pack(pack_format, mid, properties, len(topic), topic, qos)
|
|
else:
|
|
packet += pack_remaining_length(2+2+len(topic)+1)
|
|
pack_format = "!HH"+str(len(topic))+"sB"
|
|
return packet + struct.pack(pack_format, mid, len(topic), topic, qos)
|
|
|
|
|
|
def gen_suback(mid, qos, proto_ver=4):
|
|
if proto_ver == 5:
|
|
return struct.pack('!BBHBB', 144, 2+1+1, mid, 0, qos)
|
|
else:
|
|
return struct.pack('!BBHB', 144, 2+1, mid, qos)
|
|
|
|
def gen_unsubscribe(mid, topic, cmd=162, proto_ver=4, properties=b""):
|
|
topic = topic.encode("utf-8")
|
|
if proto_ver == 5:
|
|
if properties == b"":
|
|
pack_format = "!BBHBH"+str(len(topic))+"s"
|
|
return struct.pack(pack_format, cmd, 2+2+len(topic)+1, mid, 0, len(topic), topic)
|
|
else:
|
|
properties = mqtt5_props.prop_finalise(properties)
|
|
packet = struct.pack("!B", cmd)
|
|
l = 2+2+len(topic)+len(properties)
|
|
packet += pack_remaining_length(l)
|
|
pack_format = "!H"+str(len(properties))+"sH"+str(len(topic))+"s"
|
|
packet += struct.pack(pack_format, mid, properties, len(topic), topic)
|
|
return packet
|
|
else:
|
|
pack_format = "!BBHH"+str(len(topic))+"s"
|
|
return struct.pack(pack_format, cmd, 2+2+len(topic), mid, len(topic), topic)
|
|
|
|
def gen_unsubscribe_multiple(mid, topics, proto_ver=4):
|
|
packet = b""
|
|
remaining_length = 0
|
|
for t in topics:
|
|
t = t.encode("utf-8")
|
|
remaining_length += 2+len(t)
|
|
packet += struct.pack("!H"+str(len(t))+"s", len(t), t)
|
|
|
|
if proto_ver == 5:
|
|
remaining_length += 2+1
|
|
|
|
return struct.pack("!BBHB", 162, remaining_length, mid, 0) + packet
|
|
else:
|
|
remaining_length += 2
|
|
|
|
return struct.pack("!BBH", 162, remaining_length, mid) + packet
|
|
|
|
def gen_unsuback(mid, reason_code=0, proto_ver=4):
|
|
if proto_ver == 5:
|
|
if isinstance(reason_code, list):
|
|
reason_code_count = len(reason_code)
|
|
p = struct.pack('!BBHB', 176, 3+reason_code_count, mid, 0)
|
|
for r in reason_code:
|
|
p += struct.pack('B', r)
|
|
return p
|
|
else:
|
|
return struct.pack('!BBHBB', 176, 4, mid, 0, reason_code)
|
|
else:
|
|
return struct.pack('!BBH', 176, 2, mid)
|
|
|
|
def gen_pingreq():
|
|
return struct.pack('!BB', 192, 0)
|
|
|
|
def gen_pingresp():
|
|
return struct.pack('!BB', 208, 0)
|
|
|
|
|
|
def _gen_short(cmd, reason_code=-1, proto_ver=5, properties=None):
|
|
if proto_ver == 5 and (reason_code != -1 or properties is not None):
|
|
if reason_code == -1:
|
|
reason_code = 0
|
|
|
|
if properties is None:
|
|
return struct.pack('!BBB', cmd, 1, reason_code)
|
|
elif properties == "":
|
|
return struct.pack('!BBBB', cmd, 2, reason_code, 0)
|
|
else:
|
|
properties = mqtt5_props.prop_finalise(properties)
|
|
return struct.pack("!BBB", cmd, 1+len(properties), reason_code) + properties
|
|
else:
|
|
return struct.pack('!BB', cmd, 0)
|
|
|
|
def gen_disconnect(reason_code=-1, proto_ver=4, properties=None):
|
|
return _gen_short(0xE0, reason_code, proto_ver, properties)
|
|
|
|
def gen_auth(reason_code=-1, properties=None):
|
|
return _gen_short(0xF0, reason_code, 5, properties)
|
|
|
|
|
|
def pack_remaining_length(remaining_length):
|
|
s = b""
|
|
while True:
|
|
byte = remaining_length % 128
|
|
remaining_length = remaining_length // 128
|
|
# If there are more digits to encode, set the top bit of this digit
|
|
if remaining_length > 0:
|
|
byte = byte | 0x80
|
|
|
|
s = s + struct.pack("!B", byte)
|
|
if remaining_length == 0:
|
|
return s
|
|
|
|
|
|
def get_port(count=1):
|
|
if count == 1:
|
|
if len(sys.argv) == 2:
|
|
return int(sys.argv[1])
|
|
else:
|
|
return 1888
|
|
else:
|
|
if len(sys.argv) >= 1+count:
|
|
p = ()
|
|
for i in range(0, count):
|
|
p = p + (int(sys.argv[1+i]),)
|
|
return p
|
|
else:
|
|
return tuple(range(1888, 1888+count))
|
|
|
|
|
|
def do_ping(sock, error_string="pingresp"):
|
|
do_send_receive(sock, gen_pingreq(), gen_pingresp(), error_string)
|
|
|
|
def client_test(client_cmd, client_args, callback, cb_data):
|
|
port = get_port()
|
|
|
|
rc = 1
|
|
|
|
sock = listen_sock(port);
|
|
|
|
args = [get_build_root() + "/test/lib/" + client_cmd, str(port)]
|
|
if client_args is not None:
|
|
args = args + client_args
|
|
|
|
client = start_client(filename=client_cmd.replace('/', '-'), cmd=args)
|
|
|
|
try:
|
|
(conn, address) = sock.accept()
|
|
conn.settimeout(10)
|
|
|
|
callback(conn, cb_data)
|
|
rc = 0
|
|
|
|
conn.close()
|
|
except mosq_test.TestError:
|
|
pass
|
|
except Exception as err:
|
|
print(err)
|
|
raise
|
|
finally:
|
|
if wait_for_subprocess(client):
|
|
print("test client not finished")
|
|
rc=1
|
|
sock.close()
|
|
if rc:
|
|
(o, e) = client.communicate()
|
|
print(o)
|
|
print(e)
|
|
print(f"Fail: {client_cmd}")
|
|
exit(rc)
|
|
|
|
|
|
# =============================================
|
|
# Websockets wrapper
|
|
# =============================================
|
|
class WebsocketConnectionError(ValueError):
|
|
pass
|
|
|
|
class WebsocketWrapper(object):
|
|
OPCODE_CONTINUATION = 0x0
|
|
OPCODE_TEXT = 0x1
|
|
OPCODE_BINARY = 0x2
|
|
OPCODE_CONNCLOSE = 0x8
|
|
OPCODE_PING = 0x9
|
|
OPCODE_PONG = 0xa
|
|
|
|
def __init__(self, socket, host, port, is_ssl, path, extra_headers):
|
|
|
|
self.connected = False
|
|
|
|
self._ssl = is_ssl
|
|
self._host = host
|
|
self._port = port
|
|
self._socket = socket
|
|
self._path = path
|
|
|
|
self._sendbuffer = bytearray()
|
|
self._readbuffer = bytearray()
|
|
|
|
self._requested_size = 0
|
|
self._payload_head = 0
|
|
self._readbuffer_head = 0
|
|
|
|
self._do_handshake(extra_headers)
|
|
|
|
def __del__(self):
|
|
|
|
self._sendbuffer = None
|
|
self._readbuffer = None
|
|
|
|
def _do_handshake(self, extra_headers):
|
|
|
|
sec_websocket_key = uuid.uuid4().bytes
|
|
sec_websocket_key = base64.b64encode(sec_websocket_key)
|
|
|
|
websocket_headers = {
|
|
"Host": "{self._host:s}:{self._port:d}".format(self=self),
|
|
"Upgrade": "websocket",
|
|
"Connection": "Upgrade",
|
|
"Origin": "https://{self._host:s}:{self._port:d}".format(self=self),
|
|
"Sec-WebSocket-Key": sec_websocket_key.decode("utf8"),
|
|
"Sec-Websocket-Version": "13",
|
|
"Sec-Websocket-Protocol": "mqtt",
|
|
}
|
|
|
|
# This is checked in ws_set_options so it will either be None, a
|
|
# dictionary, or a callable
|
|
if isinstance(extra_headers, dict):
|
|
websocket_headers.update(extra_headers)
|
|
elif callable(extra_headers):
|
|
websocket_headers = extra_headers(websocket_headers)
|
|
|
|
header = "\r\n".join([
|
|
"GET {self._path} HTTP/1.1".format(self=self),
|
|
"\r\n".join("{}: {}".format(i, j)
|
|
for i, j in websocket_headers.items()),
|
|
"\r\n",
|
|
]).encode("utf8")
|
|
|
|
self._socket.send(header)
|
|
|
|
has_secret = False
|
|
has_upgrade = False
|
|
|
|
while True:
|
|
# read HTTP response header as lines
|
|
byte = self._socket.recv(1)
|
|
|
|
self._readbuffer.extend(byte)
|
|
|
|
# line end
|
|
if byte == b"\n":
|
|
if len(self._readbuffer) > 2:
|
|
# check upgrade
|
|
if b"connection" in str(self._readbuffer).lower().encode('utf-8'):
|
|
if b"upgrade" not in str(self._readbuffer).lower().encode('utf-8'):
|
|
raise WebsocketConnectionError(
|
|
"WebSocket handshake error, connection not upgraded")
|
|
else:
|
|
has_upgrade = True
|
|
|
|
# check key hash
|
|
if b"sec-websocket-accept" in str(self._readbuffer).lower().encode('utf-8'):
|
|
GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
|
|
|
|
server_hash = self._readbuffer.decode(
|
|
'utf-8').split(": ", 1)[1]
|
|
server_hash = server_hash.strip().encode('utf-8')
|
|
|
|
client_hash = sec_websocket_key.decode('utf-8') + GUID
|
|
client_hash = hashlib.sha1(client_hash.encode('utf-8'))
|
|
client_hash = base64.b64encode(client_hash.digest())
|
|
|
|
if server_hash != client_hash:
|
|
raise WebsocketConnectionError(
|
|
"WebSocket handshake error, invalid secret key")
|
|
else:
|
|
has_secret = True
|
|
else:
|
|
# ending linebreak
|
|
break
|
|
|
|
# reset linebuffer
|
|
self._readbuffer = bytearray()
|
|
|
|
# connection reset
|
|
elif not byte:
|
|
raise WebsocketConnectionError("WebSocket handshake error")
|
|
|
|
if not has_upgrade or not has_secret:
|
|
raise WebsocketConnectionError("WebSocket handshake error")
|
|
|
|
self._readbuffer = bytearray()
|
|
self.connected = True
|
|
|
|
def _create_frame(self, opcode, data, do_masking=1):
|
|
|
|
header = bytearray()
|
|
length = len(data)
|
|
|
|
mask_key = bytearray(os.urandom(4))
|
|
mask_flag = do_masking
|
|
|
|
# 1 << 7 is the final flag, we don't send continuated data
|
|
header.append(1 << 7 | opcode)
|
|
|
|
if length < 126:
|
|
header.append(mask_flag << 7 | length)
|
|
|
|
elif length < 65536:
|
|
header.append(mask_flag << 7 | 126)
|
|
header += struct.pack("!H", length)
|
|
|
|
elif length < 0x8000000000000001:
|
|
header.append(mask_flag << 7 | 127)
|
|
header += struct.pack("!Q", length)
|
|
|
|
else:
|
|
raise ValueError("Maximum payload size is 2^63")
|
|
|
|
if mask_flag == 1:
|
|
for index in range(length):
|
|
data[index] ^= mask_key[index % 4]
|
|
data = mask_key + data
|
|
|
|
return header + data
|
|
|
|
def _buffered_read(self, length):
|
|
|
|
# try to recv and store needed bytes
|
|
wanted_bytes = length - (len(self._readbuffer) - self._readbuffer_head)
|
|
if wanted_bytes > 0:
|
|
|
|
data = self._socket.recv(wanted_bytes)
|
|
|
|
if not data:
|
|
raise ConnectionAbortedError
|
|
else:
|
|
self._readbuffer.extend(data)
|
|
|
|
if len(data) < wanted_bytes:
|
|
print(f"{len(data)} {wanted_bytes}")
|
|
raise BlockingIOError
|
|
|
|
self._readbuffer_head += length
|
|
return self._readbuffer[self._readbuffer_head - length:self._readbuffer_head]
|
|
|
|
def _recv_impl(self, length):
|
|
|
|
# try to decode websocket payload part from data
|
|
try:
|
|
|
|
self._readbuffer_head = 0
|
|
|
|
result = None
|
|
|
|
chunk_startindex = self._payload_head
|
|
chunk_endindex = self._payload_head + length
|
|
|
|
header1 = self._buffered_read(1)
|
|
header2 = self._buffered_read(1)
|
|
|
|
opcode = (header1[0] & 0x0f)
|
|
maskbit = (header2[0] & 0x80) == 0x80
|
|
lengthbits = (header2[0] & 0x7f)
|
|
payload_length = lengthbits
|
|
mask_key = None
|
|
|
|
# read length
|
|
if lengthbits == 0x7e:
|
|
|
|
value = self._buffered_read(2)
|
|
payload_length, = struct.unpack("!H", value)
|
|
|
|
elif lengthbits == 0x7f:
|
|
|
|
value = self._buffered_read(8)
|
|
payload_length, = struct.unpack("!Q", value)
|
|
|
|
# read mask
|
|
if maskbit:
|
|
mask_key = self._buffered_read(4)
|
|
|
|
# if frame payload is shorter than the requested data, read only the possible part
|
|
readindex = chunk_endindex
|
|
if payload_length < readindex:
|
|
readindex = payload_length
|
|
|
|
if readindex > 0:
|
|
# get payload chunk
|
|
payload = self._buffered_read(readindex)
|
|
|
|
# unmask only the needed part
|
|
if maskbit:
|
|
for index in range(chunk_startindex, readindex):
|
|
payload[index] ^= mask_key[index % 4]
|
|
|
|
result = payload[chunk_startindex:readindex]
|
|
self._payload_head = readindex
|
|
else:
|
|
payload = bytearray()
|
|
|
|
# check if full frame arrived and reset readbuffer and payloadhead if needed
|
|
if readindex == payload_length:
|
|
self._readbuffer = bytearray()
|
|
self._payload_head = 0
|
|
|
|
# respond to non-binary opcodes, their arrival is not guaranteed beacause of non-blocking sockets
|
|
if opcode == WebsocketWrapper.OPCODE_CONNCLOSE:
|
|
frame = self._create_frame(
|
|
WebsocketWrapper.OPCODE_CONNCLOSE, payload, 0)
|
|
self._socket.send(frame)
|
|
|
|
if opcode == WebsocketWrapper.OPCODE_PING:
|
|
frame = self._create_frame(
|
|
WebsocketWrapper.OPCODE_PONG, payload, 0)
|
|
self._socket.send(frame)
|
|
|
|
# This isn't *proper* handling of continuation frames, but given
|
|
# that we only support binary frames, it is *probably* good enough.
|
|
if (opcode == WebsocketWrapper.OPCODE_BINARY or opcode == WebsocketWrapper.OPCODE_CONTINUATION) \
|
|
and payload_length > 0:
|
|
return result
|
|
else:
|
|
#raise BlockingIOError
|
|
return b""
|
|
|
|
except ConnectionError:
|
|
self.connected = False
|
|
return b''
|
|
|
|
def _send_impl(self, data):
|
|
|
|
# if previous frame was sent successfully
|
|
if len(self._sendbuffer) == 0:
|
|
# create websocket frame
|
|
frame = self._create_frame(
|
|
WebsocketWrapper.OPCODE_BINARY, bytearray(data))
|
|
self._sendbuffer.extend(frame)
|
|
self._requested_size = len(data)
|
|
|
|
# try to write out as much as possible
|
|
length = self._socket.send(self._sendbuffer)
|
|
|
|
self._sendbuffer = self._sendbuffer[length:]
|
|
|
|
if len(self._sendbuffer) == 0:
|
|
# buffer sent out completely, return with payload's size
|
|
return self._requested_size
|
|
else:
|
|
# couldn't send whole data, request the same data again with 0 as sent length
|
|
return 0
|
|
|
|
def recv(self, length):
|
|
return self._recv_impl(length)
|
|
|
|
def read(self, length):
|
|
return self._recv_impl(length)
|
|
|
|
def send(self, data):
|
|
return self._send_impl(data)
|
|
|
|
def write(self, data):
|
|
return self._send_impl(data)
|
|
|
|
def close(self):
|
|
self._socket.close()
|
|
|
|
def fileno(self):
|
|
return self._socket.fileno()
|
|
|
|
def pending(self):
|
|
# Fix for bug #131: a SSL socket may still have data available
|
|
# for reading without select() being aware of it.
|
|
if self._ssl:
|
|
return self._socket.pending()
|
|
else:
|
|
# normal socket rely only on select()
|
|
return 0
|
|
|
|
def setblocking(self, flag):
|
|
self._socket.setblocking(flag)
|
|
|
|
|
|
@atexit.register
|
|
def test_cleanup():
|
|
global vg_logfiles
|
|
|
|
if os.environ.get('MOSQ_USE_VALGRIND') is not None:
|
|
for f in vg_logfiles:
|
|
try:
|
|
if os.stat(f).st_size == 0:
|
|
os.remove(f)
|
|
except OSError:
|
|
pass
|