407 lines
14 KiB
Python
407 lines
14 KiB
Python
from tests._helpers import (
|
|
unittest, parameterized, Fraction, time,
|
|
run_test, run_compiler_error_test,
|
|
ArrayIndex, ArraySlice, Bool, BinOp, BuiltIn, DataArray, DataDict, DataRangeArray,
|
|
Defini, Continva, Designa, DesignaDestructure, DesignaIndex, DumStatement,
|
|
Erumpe, ExpressionStatement, Fvnctio, ID, InterpolatedString, Invoca,
|
|
ModuleCall, Nullus, Numeral, PerStatement, Program, Redi, SiStatement,
|
|
String, TemptaStatement, UnaryMinus, UnaryNot, Fractio, frac_to_fraction,
|
|
fraction_to_frac, num_to_int, int_to_num, make_string,
|
|
ValInt, ValStr, ValBool, ValList, ValDict, ValNul, ValFunc, ValFrac,
|
|
CentvrionError, _RUNTIME_C, _cent_rng,
|
|
Lexer, Parser, compile_program,
|
|
os, subprocess, tempfile, StringIO, patch,
|
|
)
|
|
|
|
|
|
# --- DORMI ---
|
|
|
|
dormi_tests = [
|
|
("DORMI(NVLLVS)",
|
|
Program([], [ExpressionStatement(BuiltIn("DORMI", [Nullus()]))]),
|
|
ValNul()),
|
|
]
|
|
|
|
class TestDormi(unittest.TestCase):
|
|
@parameterized.expand(dormi_tests)
|
|
def test_dormi(self, source, nodes, value, output=""):
|
|
run_test(self, source, nodes, value, output)
|
|
|
|
def test_dormi_timing_int(self):
|
|
source = "DORMI(I)\n"
|
|
lexer = Lexer().get_lexer()
|
|
tokens = lexer.lex(source)
|
|
program = Parser().parse(tokens)
|
|
|
|
start = time.time()
|
|
program.eval()
|
|
elapsed = time.time() - start
|
|
self.assertAlmostEqual(elapsed, 1.0, delta=0.5)
|
|
|
|
def test_dormi_timing_int_compiled(self):
|
|
source = "DORMI(I)\n"
|
|
lexer = Lexer().get_lexer()
|
|
tokens = lexer.lex(source)
|
|
program = Parser().parse(tokens)
|
|
|
|
c_source = compile_program(program)
|
|
with tempfile.NamedTemporaryFile(suffix=".c", delete=False, mode="w") as tmp_c:
|
|
tmp_c.write(c_source)
|
|
tmp_c_path = tmp_c.name
|
|
with tempfile.NamedTemporaryFile(suffix="", delete=False) as tmp_bin:
|
|
tmp_bin_path = tmp_bin.name
|
|
try:
|
|
subprocess.run(
|
|
["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path, "-lcurl", "-lmicrohttpd"],
|
|
check=True, capture_output=True,
|
|
)
|
|
start = time.time()
|
|
proc = subprocess.run([tmp_bin_path], capture_output=True, text=True)
|
|
elapsed = time.time() - start
|
|
self.assertEqual(proc.returncode, 0)
|
|
self.assertAlmostEqual(elapsed, 1.0, delta=0.5)
|
|
finally:
|
|
os.unlink(tmp_c_path)
|
|
os.unlink(tmp_bin_path)
|
|
|
|
def test_dormi_timing_frac(self):
|
|
source = "CVM FRACTIO\nDORMI(S)\n"
|
|
lexer = Lexer().get_lexer()
|
|
tokens = lexer.lex(source)
|
|
program = Parser().parse(tokens)
|
|
|
|
start = time.time()
|
|
program.eval()
|
|
elapsed = time.time() - start
|
|
self.assertAlmostEqual(elapsed, 0.5, delta=0.5)
|
|
|
|
def test_dormi_timing_frac_compiled(self):
|
|
source = "CVM FRACTIO\nDORMI(S)\n"
|
|
lexer = Lexer().get_lexer()
|
|
tokens = lexer.lex(source)
|
|
program = Parser().parse(tokens)
|
|
|
|
c_source = compile_program(program)
|
|
with tempfile.NamedTemporaryFile(suffix=".c", delete=False, mode="w") as tmp_c:
|
|
tmp_c.write(c_source)
|
|
tmp_c_path = tmp_c.name
|
|
with tempfile.NamedTemporaryFile(suffix="", delete=False) as tmp_bin:
|
|
tmp_bin_path = tmp_bin.name
|
|
try:
|
|
subprocess.run(
|
|
["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path, "-lcurl", "-lmicrohttpd"],
|
|
check=True, capture_output=True,
|
|
)
|
|
start = time.time()
|
|
proc = subprocess.run([tmp_bin_path], capture_output=True, text=True)
|
|
elapsed = time.time() - start
|
|
self.assertEqual(proc.returncode, 0)
|
|
self.assertAlmostEqual(elapsed, 0.5, delta=0.5)
|
|
finally:
|
|
os.unlink(tmp_c_path)
|
|
os.unlink(tmp_bin_path)
|
|
|
|
class TestScripta(unittest.TestCase):
|
|
def _run_scripta(self, source):
|
|
lexer = Lexer().get_lexer()
|
|
tokens = lexer.lex(source + "\n")
|
|
program = Parser().parse(tokens)
|
|
# printer round-trip
|
|
new_text = program.print()
|
|
new_tokens = Lexer().get_lexer().lex(new_text + "\n")
|
|
new_nodes = Parser().parse(new_tokens)
|
|
self.assertEqual(program, new_nodes, f"Printer test\n{source}\n{new_text}")
|
|
return program
|
|
|
|
def _compile_and_run(self, program):
|
|
c_source = compile_program(program)
|
|
with tempfile.NamedTemporaryFile(suffix=".c", delete=False, mode="w") as tmp_c:
|
|
tmp_c.write(c_source)
|
|
tmp_c_path = tmp_c.name
|
|
with tempfile.NamedTemporaryFile(suffix="", delete=False) as tmp_bin:
|
|
tmp_bin_path = tmp_bin.name
|
|
try:
|
|
subprocess.run(
|
|
["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path, "-lcurl", "-lmicrohttpd"],
|
|
check=True, capture_output=True,
|
|
)
|
|
proc = subprocess.run([tmp_bin_path], capture_output=True, text=True)
|
|
self.assertEqual(proc.returncode, 0, f"Compiler binary exited non-zero:\n{proc.stderr}")
|
|
return proc.stdout
|
|
finally:
|
|
os.unlink(tmp_c_path)
|
|
os.unlink(tmp_bin_path)
|
|
|
|
def test_scribe_and_lege(self):
|
|
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
source = f'CVM SCRIPTA\nSCRIBE("{path}", "SALVE MVNDE")\nDIC(LEGE("{path}"))'
|
|
program = self._run_scripta(source)
|
|
captured = StringIO()
|
|
with patch("sys.stdout", captured):
|
|
program.eval()
|
|
self.assertEqual(captured.getvalue(), "SALVE MVNDE\n")
|
|
with open(path) as f:
|
|
self.assertEqual(f.read(), "SALVE MVNDE")
|
|
finally:
|
|
if os.path.exists(path):
|
|
os.unlink(path)
|
|
|
|
def test_scribe_and_lege_compiled(self):
|
|
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
source = f'CVM SCRIPTA\nSCRIBE("{path}", "SALVE MVNDE")\nDIC(LEGE("{path}"))'
|
|
program = self._run_scripta(source)
|
|
output = self._compile_and_run(program)
|
|
self.assertEqual(output, "SALVE MVNDE\n")
|
|
with open(path) as f:
|
|
self.assertEqual(f.read(), "SALVE MVNDE")
|
|
finally:
|
|
if os.path.exists(path):
|
|
os.unlink(path)
|
|
|
|
def test_adivnge(self):
|
|
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
source = f'CVM SCRIPTA\nSCRIBE("{path}", "SALVE")\nADIVNGE("{path}", " MVNDE")\nDIC(LEGE("{path}"))'
|
|
program = self._run_scripta(source)
|
|
captured = StringIO()
|
|
with patch("sys.stdout", captured):
|
|
program.eval()
|
|
self.assertEqual(captured.getvalue(), "SALVE MVNDE\n")
|
|
finally:
|
|
if os.path.exists(path):
|
|
os.unlink(path)
|
|
|
|
def test_adivnge_compiled(self):
|
|
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
source = f'CVM SCRIPTA\nSCRIBE("{path}", "SALVE")\nADIVNGE("{path}", " MVNDE")\nDIC(LEGE("{path}"))'
|
|
program = self._run_scripta(source)
|
|
output = self._compile_and_run(program)
|
|
self.assertEqual(output, "SALVE MVNDE\n")
|
|
finally:
|
|
if os.path.exists(path):
|
|
os.unlink(path)
|
|
|
|
def test_scribe_overwrites(self):
|
|
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
source = f'CVM SCRIPTA\nSCRIBE("{path}", "first")\nSCRIBE("{path}", "second")\nLEGE("{path}")'
|
|
program = self._run_scripta(source)
|
|
result = program.eval()
|
|
self.assertEqual(result, ValStr("second"))
|
|
finally:
|
|
if os.path.exists(path):
|
|
os.unlink(path)
|
|
|
|
def test_lege_empty_file(self):
|
|
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False, mode="w") as f:
|
|
path = f.name
|
|
try:
|
|
source = f'CVM SCRIPTA\nLEGE("{path}")'
|
|
program = self._run_scripta(source)
|
|
result = program.eval()
|
|
self.assertEqual(result, ValStr(""))
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
def test_lege_preexisting_content(self):
|
|
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False, mode="w") as f:
|
|
f.write("hello from python")
|
|
path = f.name
|
|
try:
|
|
source = f'CVM SCRIPTA\nLEGE("{path}")'
|
|
program = self._run_scripta(source)
|
|
result = program.eval()
|
|
self.assertEqual(result, ValStr("hello from python"))
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
def test_scribe_returns_nullus(self):
|
|
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
source = f'CVM SCRIPTA\nSCRIBE("{path}", "x")'
|
|
program = self._run_scripta(source)
|
|
result = program.eval()
|
|
self.assertEqual(result, ValNul())
|
|
finally:
|
|
if os.path.exists(path):
|
|
os.unlink(path)
|
|
|
|
def test_adivnge_returns_nullus(self):
|
|
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
source = f'CVM SCRIPTA\nADIVNGE("{path}", "x")'
|
|
program = self._run_scripta(source)
|
|
result = program.eval()
|
|
self.assertEqual(result, ValNul())
|
|
finally:
|
|
if os.path.exists(path):
|
|
os.unlink(path)
|
|
|
|
class TestRete(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
import http.server, threading
|
|
class Handler(http.server.BaseHTTPRequestHandler):
|
|
def do_GET(self):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/plain")
|
|
self.end_headers()
|
|
self.wfile.write(b"SALVE MVNDE")
|
|
def log_message(self, *args):
|
|
pass
|
|
cls.server = http.server.HTTPServer(("127.0.0.1", 0), Handler)
|
|
cls.port = cls.server.server_address[1]
|
|
cls.thread = threading.Thread(target=cls.server.serve_forever)
|
|
cls.thread.daemon = True
|
|
cls.thread.start()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls.server.shutdown()
|
|
|
|
def test_pete(self):
|
|
url = f"http://127.0.0.1:{self.port}/"
|
|
source = f'CVM RETE\nPETE("{url}")'
|
|
run_test(self, source,
|
|
Program([ModuleCall("RETE")], [ExpressionStatement(BuiltIn("PETE", [String(url)]))]),
|
|
ValStr("SALVE MVNDE"))
|
|
|
|
def test_pete_dic(self):
|
|
url = f"http://127.0.0.1:{self.port}/"
|
|
source = f'CVM RETE\nDIC(PETE("{url}"))'
|
|
run_test(self, source,
|
|
Program([ModuleCall("RETE")], [ExpressionStatement(BuiltIn("DIC", [BuiltIn("PETE", [String(url)])]))]),
|
|
ValStr("SALVE MVNDE"), "SALVE MVNDE\n")
|
|
|
|
class TestReteServer(unittest.TestCase):
|
|
"""Integration tests for PETITVR + AVSCVLTA server functionality."""
|
|
|
|
def _wait_for_server(self, port, timeout=2.0):
|
|
"""Poll until the server is accepting connections."""
|
|
import socket
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
try:
|
|
with socket.create_connection(("127.0.0.1", port), timeout=0.1):
|
|
return
|
|
except OSError:
|
|
time.sleep(0.05)
|
|
self.fail(f"Server on port {port} did not start within {timeout}s")
|
|
|
|
def _free_port(self):
|
|
"""Find a free port in range 1024-3999 (representable without MAGNVM)."""
|
|
import socket, random
|
|
for _ in range(100):
|
|
port = random.randint(1024, 3999)
|
|
try:
|
|
with socket.socket() as s:
|
|
s.bind(("127.0.0.1", port))
|
|
return port
|
|
except OSError:
|
|
continue
|
|
raise RuntimeError("Could not find a free port in range 1024-3999")
|
|
|
|
def _run_server(self, source):
|
|
"""Parse and eval source in a daemon thread. Returns when server is ready."""
|
|
import threading
|
|
lexer = Lexer().get_lexer()
|
|
tokens = lexer.lex(source + "\n")
|
|
program = Parser().parse(tokens)
|
|
t = threading.Thread(target=program.eval, daemon=True)
|
|
t.start()
|
|
|
|
def test_basic_handler(self):
|
|
port = self._free_port()
|
|
source = (
|
|
f'CVM RETE\n'
|
|
f'PETITVR("/", FVNCTIO (petitio) VT {{\n'
|
|
f'REDI("SALVE MVNDE")\n'
|
|
f'}})\n'
|
|
f'AVSCVLTA({int_to_num(port, False)})'
|
|
)
|
|
self._run_server(source)
|
|
self._wait_for_server(port)
|
|
import urllib.request
|
|
resp = urllib.request.urlopen(f"http://127.0.0.1:{port}/")
|
|
self.assertEqual(resp.read().decode(), "SALVE MVNDE")
|
|
|
|
def test_multiple_routes(self):
|
|
port = self._free_port()
|
|
source = (
|
|
f'CVM RETE\n'
|
|
f'PETITVR("/", FVNCTIO (p) VT {{\nREDI("RADIX")\n}})\n'
|
|
f'PETITVR("/nomen", FVNCTIO (p) VT {{\nREDI("MARCVS")\n}})\n'
|
|
f'AVSCVLTA({int_to_num(port, False)})'
|
|
)
|
|
self._run_server(source)
|
|
self._wait_for_server(port)
|
|
import urllib.request
|
|
resp1 = urllib.request.urlopen(f"http://127.0.0.1:{port}/")
|
|
self.assertEqual(resp1.read().decode(), "RADIX")
|
|
resp2 = urllib.request.urlopen(f"http://127.0.0.1:{port}/nomen")
|
|
self.assertEqual(resp2.read().decode(), "MARCVS")
|
|
|
|
def test_404_unmatched(self):
|
|
port = self._free_port()
|
|
source = (
|
|
f'CVM RETE\n'
|
|
f'PETITVR("/", FVNCTIO (p) VT {{\nREDI("ok")\n}})\n'
|
|
f'AVSCVLTA({int_to_num(port, False)})'
|
|
)
|
|
self._run_server(source)
|
|
self._wait_for_server(port)
|
|
import urllib.request, urllib.error
|
|
with self.assertRaises(urllib.error.HTTPError) as ctx:
|
|
urllib.request.urlopen(f"http://127.0.0.1:{port}/nonexistent")
|
|
self.assertEqual(ctx.exception.code, 404)
|
|
|
|
def test_request_dict_via(self):
|
|
port = self._free_port()
|
|
source = (
|
|
f'CVM RETE\n'
|
|
f'PETITVR("/echo", FVNCTIO (petitio) VT {{\n'
|
|
f'REDI(petitio["via"])\n'
|
|
f'}})\n'
|
|
f'AVSCVLTA({int_to_num(port, False)})'
|
|
)
|
|
self._run_server(source)
|
|
self._wait_for_server(port)
|
|
import urllib.request
|
|
resp = urllib.request.urlopen(f"http://127.0.0.1:{port}/echo")
|
|
self.assertEqual(resp.read().decode(), "/echo")
|
|
|
|
def test_request_dict_quaestio(self):
|
|
port = self._free_port()
|
|
source = (
|
|
f'CVM RETE\n'
|
|
f'PETITVR("/q", FVNCTIO (petitio) VT {{\n'
|
|
f'REDI(petitio["quaestio"])\n'
|
|
f'}})\n'
|
|
f'AVSCVLTA({int_to_num(port, False)})'
|
|
)
|
|
self._run_server(source)
|
|
self._wait_for_server(port)
|
|
import urllib.request
|
|
resp = urllib.request.urlopen(f"http://127.0.0.1:{port}/q?nomen=Marcus")
|
|
self.assertEqual(resp.read().decode(), "nomen=Marcus")
|
|
|
|
def test_petitvr_stores_route(self):
|
|
"""PETITVR alone (without AVSCVLTA) just stores a route and returns NVLLVS."""
|
|
source = 'CVM RETE\nPETITVR("/", FVNCTIO (p) VT {\nREDI("hi")\n})'
|
|
run_test(self, source,
|
|
Program([ModuleCall("RETE")], [ExpressionStatement(BuiltIn("PETITVR", [
|
|
String("/"),
|
|
Fvnctio([ID("p")], [Redi([String("hi")])])
|
|
]))]),
|
|
ValNul())
|