Files
MIT6.828_OS/lab/grade-lab6
2019-07-17 16:08:06 +08:00

254 lines
7.7 KiB
Python

#!/usr/bin/env python
import os, re, threading, socket, time, shutil, struct, difflib
try:
from urllib2 import urlopen, HTTPError
except ImportError:
# Python 3
from urllib.request import urlopen
from urllib.error import HTTPError
from gradelib import *
r = Runner(save("jos.out"),
stop_breakpoint("readline"))
def match_packet_seq(got, expect):
got, expect = list(got), list(expect)
s = difflib.SequenceMatcher(None, got, [d for n, d in expect])
msgs, bad = [], False
for tag, i1, i2, j1, j2 in s.get_opcodes():
wrong = []
if tag in ("delete", "replace"):
wrong.append([("unexpected packet:", pkt) for pkt in got[i1:i2]])
if tag in ("insert", "replace"):
wrong.append([("missing %s:" % n, pkt) for n, pkt in expect[j1:j2]])
for seq in wrong:
bad = True
for msg, pkt in seq[:3]:
msgs.append(color("red", msg) + "\n" + hexdump(pkt))
if seq[3:]:
msgs.append("... plus %d more" % len(seq[3:]))
if tag == "equal":
msgs.append(color("green", "got ") + expect[j1][0])
if j1 + 1 < j2:
msgs[-1] += " through " + expect[j2 - 1][0]
assert not bad, "\n".join(msgs)
def save_pcap_on_fail():
def save_pcap(fail):
save_path = "qemu.pcap." + get_current_test().__name__[5:]
if fail and os.path.exists("qemu.pcap"):
shutil.copyfile("qemu.pcap", save_path)
print(" Packet capture saved to %s" % save_path)
elif not fail and os.path.exists(save_path):
os.unlink(save_path)
print(" (Old %s failed packet capture deleted)" % save_path)
get_current_test().on_finish.append(save_pcap)
def read_pcap():
f = open("qemu.pcap", "rb")
s = struct.Struct("=IHHiIII")
hdr = s.unpack(f.read(s.size))
assert_equal(hdr[0], 0xa1b2c3d4, "Bad pcap magic number")
assert_equal((hdr[1], hdr[2]), (2, 4), "Bad pcap version number")
s = struct.Struct("=iiII")
while True:
hdr_data = f.read(s.size)
if not len(hdr_data):
return
hdr = s.unpack(hdr_data)
yield bytes(f.read(hdr[2]))
def ascii_to_bytes(s):
if bytes == str:
# Python 2
return s
else:
# Python 3
return bytes(s, "ascii")
def hexdump(data):
data = bytearray(data)
buf = []
for i in range(0, len(data), 16):
chunk = data[i:i+16]
hd = "".join("%02x" % b for b in chunk)
hd = " ".join(hd[j:j+4] for j in range(0, len(hd), 4))
ad = "".join(chr(c) if chr(c).isalnum() else "." for c in chunk)
buf.append("%04x %-40s %s" % (i, hd, ad))
return "\n".join(buf)
echo_port = QEMU.get_gdb_port() + 1
http_port = QEMU.get_gdb_port() + 2
socket.setdefaulttimeout(5)
#
# Basic tests
#
@test(5)
def test_testtime():
r.user_test("testtime", make_args=["INIT_CFLAGS=-DTEST_NO_NS"])
r.match(r'starting count down: 5 4 3 2 1 0 ')
@test(5)
def test_pci_attach():
r.user_test("hello", make_args=["INIT_CFLAGS=-DTEST_NO_NS"])
r.match(r'PCI function 00:03.0 \(8086:100e\) enabled')
#
# testoutput
#
def test_testoutput_helper(count):
save_pcap_on_fail()
maybe_unlink("qemu.pcap")
r.user_test("net_testoutput",
make_args=["NET_CFLAGS=-DTESTOUTPUT_COUNT=%d" % count])
# Check the packet capture
got = list(read_pcap())
expect = [("packet %d/%d" % (i+1, count), ascii_to_bytes("Packet %02d" % i))
for i in range(count)]
match_packet_seq(got, expect)
@test(15, "testoutput [5 packets]")
def test_testoutput_5():
test_testoutput_helper(5)
@test(10, "testoutput [100 packets]")
def test_testoutput_100():
test_testoutput_helper(100)
end_part("A")
#
# testinput
#
def test_testinput_helper(count):
save_pcap_on_fail()
maybe_unlink("qemu.pcap")
def send_packets():
# Send 'count' UDP packets
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect(("127.0.0.1", echo_port))
for i in range(count):
sock.send(ascii_to_bytes("Packet %03d" % i))
send_thread = threading.Thread(target=send_packets)
# When we see the final packet, we can stop QEMU
digits = "%03d" % (count-1)
final = "input: 0030 203%s 3%s3%s" % tuple("%03d" % (count-1))
r.user_test("net_testinput",
call_on_line("Waiting for packets",
lambda _: send_thread.start()),
stop_on_line(final))
if send_thread.isAlive():
send_thread.join()
# Parse testinput hexdumps for received packets
got = []
for off, dump in re.findall("input: ([0-9a-f]{4}) ([0-9a-f ]*)\n",
r.qemu.output):
if off == "0000" or not got:
got.append(bytearray())
dump = dump.replace(" ", "")
for i in range(0, len(dump), 2):
got[-1].append(int(dump[i:i+2], 16))
got = map(bytes, got)
# Get the packets QEMU actually received, since there's some
# header information we can't easily guess.
pcap = list(read_pcap())
if len(pcap) != count + 2:
raise RuntimeError("pcap contains only %d packets" % len(pcap))
# Strip transmitted ARP request
assert pcap.pop(0)[:6] == b"\xff" * 6, "First packet is not ARP request"
# The E1000 pads packets out to a 60 byte frame
pcap = [pkt.ljust(60, b"\0") for pkt in pcap]
names = ["ARP reply"] + ["packet %d/%d" % (i+1, count)
for i in range(count)]
match_packet_seq(got, zip(names, pcap))
@test(15, "testinput [5 packets]")
def test_testinput_5():
test_testinput_helper(5)
@test(10, "testinput [100 packets]")
def test_testinput_100():
test_testinput_helper(100)
#
# Servers
#
@test(15, "tcp echo server [echosrv]")
def test_echosrv():
def ready(line):
expect = ascii_to_bytes("%s: network server works" % time.time())
got = bytearray()
sock = socket.socket()
try:
sock.settimeout(5)
sock.connect(("127.0.0.1", echo_port))
sock.sendall(expect)
while got != expect:
data = sock.recv(4096)
if not data:
break
got += data
except socket.error as e:
got += "[Socket error: %s]" % e
finally:
sock.close()
assert_equal(got, expect)
raise TerminateTest
save_pcap_on_fail()
r.user_test("echosrv", call_on_line("bound", ready))
r.match("bound", no=[".*panic"])
@test(0, "web server [httpd]")
def test_httpd():
pass
def mk_test_httpd(url, expect_code, expect_data):
fullurl = "http://localhost:%d%s" % (http_port, url)
def test_httpd_test():
def ready(line):
try:
# This uses the default socket timeout (5 seconds)
res = urlopen(fullurl)
got = "(Status 200)\n" + \
res.read().decode('utf-8')
except HTTPError as e:
got = "(Status %d)" % e.code
except IOError as e:
got = "(Error: %s)" % e
expect = "(Status %d)" % expect_code
if expect_data:
expect += "\n" + expect_data
assert_equal(got, expect)
raise TerminateTest
save_pcap_on_fail()
r.user_test("httpd",
call_on_line('Waiting for http connections', ready))
r.match('Waiting for http connections',
no=[".*panic"])
test_httpd_test.__name__ += url.replace("/", "-")
return test(10, fullurl, parent=test_httpd)(test_httpd_test)
mk_test_httpd("/", 404, "")
mk_test_httpd("/index.html", 200, open("fs/index.html").read())
mk_test_httpd("/random_file.txt", 404, "")
end_part("B")
run_tests()