🐐 Web server
This commit is contained in:
202
tests.py
202
tests.py
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
import random
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
@@ -17,6 +16,7 @@ from centvrion.ast_nodes import (
|
||||
ModuleCall, Nullus, Numeral, PerStatement, Program, Redi, SiStatement,
|
||||
String, TemptaStatement, UnaryMinus, UnaryNot, Fractio, frac_to_fraction,
|
||||
fraction_to_frac, num_to_int, int_to_num, make_string,
|
||||
_cent_rng,
|
||||
)
|
||||
from centvrion.compiler.emitter import compile_program
|
||||
from centvrion.errors import CentvrionError
|
||||
@@ -30,7 +30,7 @@ _RUNTIME_C = os.path.join(
|
||||
)
|
||||
|
||||
def run_test(self, source, target_nodes, target_value, target_output="", input_lines=[]):
|
||||
random.seed(1)
|
||||
_cent_rng.seed(1)
|
||||
|
||||
lexer = Lexer().get_lexer()
|
||||
tokens = lexer.lex(source + "\n")
|
||||
@@ -83,6 +83,8 @@ def run_test(self, source, target_nodes, target_value, target_output="", input_l
|
||||
###### Compiler Test #####
|
||||
##########################
|
||||
c_source = compile_program(program)
|
||||
# Force deterministic RNG seed=1 for test reproducibility
|
||||
c_source = c_source.replace("cent_init();", "cent_init(); cent_semen((CentValue){.type=CENT_INT, .ival=1});", 1)
|
||||
with tempfile.NamedTemporaryFile(suffix=".c", delete=False, mode="w") as tmp_c:
|
||||
tmp_c.write(c_source)
|
||||
tmp_c_path = tmp_c.name
|
||||
@@ -90,7 +92,7 @@ def run_test(self, source, target_nodes, target_value, target_output="", input_l
|
||||
tmp_bin_path = tmp_bin.name
|
||||
try:
|
||||
subprocess.run(
|
||||
["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path],
|
||||
["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path, "-lcurl", "-lmicrohttpd"],
|
||||
check=True, capture_output=True,
|
||||
)
|
||||
stdin_data = "".join(f"{l}\n" for l in input_lines)
|
||||
@@ -530,22 +532,22 @@ class TestFunctions(unittest.TestCase):
|
||||
builtin_tests = [
|
||||
("AVDI_NVMERVS()", Program([], [ExpressionStatement(BuiltIn("AVDI_NVMERVS", []))]), ValInt(3), "", ["III"]),
|
||||
("AVDI_NVMERVS()", Program([], [ExpressionStatement(BuiltIn("AVDI_NVMERVS", []))]), ValInt(10), "", ["X"]),
|
||||
("CVM FORS\nFORTVITVS_NVMERVS(I, X)", Program([ModuleCall("FORS")], [ExpressionStatement(BuiltIn("FORTVITVS_NVMERVS", [Numeral("I"), Numeral("X")]))]), ValInt(3)),
|
||||
("CVM FORS\nDESIGNA a VT [I, II, III]\nDIC(a[FORTVITVS_NVMERVS(I, LONGITVDO(a))])", Program([ModuleCall("FORS")], [Designa(ID("a"), DataArray([Numeral("I"), Numeral("II"), Numeral("III")])), ExpressionStatement(BuiltIn("DIC", [ArrayIndex(ID("a"), BuiltIn("FORTVITVS_NVMERVS", [Numeral("I"), BuiltIn("LONGITVDO", [ID("a")])]))]))]), ValStr("I"), "I\n"),
|
||||
("AVDI()", Program([], [ExpressionStatement(BuiltIn("AVDI", []))]), ValStr("hello"), "", ["hello"]),
|
||||
("LONGITVDO([I, II, III])", Program([], [ExpressionStatement(BuiltIn("LONGITVDO", [DataArray([Numeral("I"), Numeral("II"), Numeral("III")])]))]), ValInt(3)),
|
||||
("LONGITVDO([])", Program([], [ExpressionStatement(BuiltIn("LONGITVDO", [DataArray([])]))]), ValInt(0)),
|
||||
('LONGITVDO("salve")', Program([], [ExpressionStatement(BuiltIn("LONGITVDO", [String("salve")]))]), ValInt(5)),
|
||||
('LONGITVDO("")', Program([], [ExpressionStatement(BuiltIn("LONGITVDO", [String("")]))]), ValInt(0)),
|
||||
("CVM FORS\nFORTVITA_ELECTIO([I, II, III])", Program([ModuleCall("FORS")], [ExpressionStatement(BuiltIn("FORTVITA_ELECTIO", [DataArray([Numeral("I"), Numeral("II"), Numeral("III")])]))]), ValInt(1)),
|
||||
("CVM FORS\nSEMEN(XLII)\nFORTVITVS_NVMERVS(I, C)", Program([ModuleCall("FORS")], [ExpressionStatement(BuiltIn("SEMEN", [Numeral("XLII")])), ExpressionStatement(BuiltIn("FORTVITVS_NVMERVS", [Numeral("I"), Numeral("C")]))]), ValInt(82)),
|
||||
# DECIMATIO: seed 42, 10 elements → removes 1 (element II)
|
||||
("CVM FORS\nSEMEN(XLII)\nDECIMATIO([I, II, III, IV, V, VI, VII, VIII, IX, X])", Program([ModuleCall("FORS")], [ExpressionStatement(BuiltIn("SEMEN", [Numeral("XLII")])), ExpressionStatement(BuiltIn("DECIMATIO", [DataArray([Numeral("I"), Numeral("II"), Numeral("III"), Numeral("IV"), Numeral("V"), Numeral("VI"), Numeral("VII"), Numeral("VIII"), Numeral("IX"), Numeral("X")])]))]), ValList([ValInt(1), ValInt(3), ValInt(4), ValInt(5), ValInt(6), ValInt(7), ValInt(8), ValInt(9), ValInt(10)])),
|
||||
("CVM FORS\nDIC(FORTVITA_ELECTIO([I, II, III]))", Program([ModuleCall("FORS")], [ExpressionStatement(BuiltIn("DIC", [BuiltIn("FORTVITA_ELECTIO", [DataArray([Numeral("I"), Numeral("II"), Numeral("III")])])]))]), ValStr("I"), "I\n"),
|
||||
("CVM FORS\nSEMEN(XLII)\nDIC(FORTVITVS_NVMERVS(I, C))", Program([ModuleCall("FORS")], [ExpressionStatement(BuiltIn("SEMEN", [Numeral("XLII")])), ExpressionStatement(BuiltIn("DIC", [BuiltIn("FORTVITVS_NVMERVS", [Numeral("I"), Numeral("C")])]))]), ValStr("XXXIII"), "XXXIII\n"),
|
||||
# DECIMATIO: seed 42, 10 elements → removes 1 (element III)
|
||||
("CVM FORS\nSEMEN(XLII)\nDIC(DECIMATIO([I, II, III, IV, V, VI, VII, VIII, IX, X]))", Program([ModuleCall("FORS")], [ExpressionStatement(BuiltIn("SEMEN", [Numeral("XLII")])), ExpressionStatement(BuiltIn("DIC", [BuiltIn("DECIMATIO", [DataArray([Numeral("I"), Numeral("II"), Numeral("III"), Numeral("IV"), Numeral("V"), Numeral("VI"), Numeral("VII"), Numeral("VIII"), Numeral("IX"), Numeral("X")])])]))]), ValStr("[I II IV V VI VII VIII IX X]"), "[I II IV V VI VII VIII IX X]\n"),
|
||||
# DECIMATIO: seed 1, 3 elements → 3//10=0, nothing removed
|
||||
("CVM FORS\nSEMEN(I)\nDECIMATIO([I, II, III])", Program([ModuleCall("FORS")], [ExpressionStatement(BuiltIn("SEMEN", [Numeral("I")])), ExpressionStatement(BuiltIn("DECIMATIO", [DataArray([Numeral("I"), Numeral("II"), Numeral("III")])]))]), ValList([ValInt(1), ValInt(2), ValInt(3)])),
|
||||
("CVM FORS\nSEMEN(I)\nDIC(DECIMATIO([I, II, III]))", Program([ModuleCall("FORS")], [ExpressionStatement(BuiltIn("SEMEN", [Numeral("I")])), ExpressionStatement(BuiltIn("DIC", [BuiltIn("DECIMATIO", [DataArray([Numeral("I"), Numeral("II"), Numeral("III")])])]))]), ValStr("[I II III]"), "[I II III]\n"),
|
||||
# DECIMATIO: empty array → empty array
|
||||
("CVM FORS\nDECIMATIO([])", Program([ModuleCall("FORS")], [ExpressionStatement(BuiltIn("DECIMATIO", [DataArray([])]))]), ValList([])),
|
||||
# DECIMATIO: seed 42, 20 elements → removes 2 (elements I and IV)
|
||||
("CVM FORS\nSEMEN(XLII)\nDECIMATIO([I, II, III, IV, V, VI, VII, VIII, IX, X, XI, XII, XIII, XIV, XV, XVI, XVII, XVIII, XIX, XX])", Program([ModuleCall("FORS")], [ExpressionStatement(BuiltIn("SEMEN", [Numeral("XLII")])), ExpressionStatement(BuiltIn("DECIMATIO", [DataArray([Numeral("I"), Numeral("II"), Numeral("III"), Numeral("IV"), Numeral("V"), Numeral("VI"), Numeral("VII"), Numeral("VIII"), Numeral("IX"), Numeral("X"), Numeral("XI"), Numeral("XII"), Numeral("XIII"), Numeral("XIV"), Numeral("XV"), Numeral("XVI"), Numeral("XVII"), Numeral("XVIII"), Numeral("XIX"), Numeral("XX")])]))]), ValList([ValInt(2), ValInt(3), ValInt(5), ValInt(6), ValInt(7), ValInt(8), ValInt(9), ValInt(10), ValInt(11), ValInt(12), ValInt(13), ValInt(14), ValInt(15), ValInt(16), ValInt(17), ValInt(18), ValInt(19), ValInt(20)])),
|
||||
("CVM FORS\nDIC(DECIMATIO([]))", Program([ModuleCall("FORS")], [ExpressionStatement(BuiltIn("DIC", [BuiltIn("DECIMATIO", [DataArray([])])]))]), ValStr("[]"), "[]\n"),
|
||||
# DECIMATIO: seed 42, 20 elements → removes 2 (elements XIII and XII)
|
||||
("CVM FORS\nSEMEN(XLII)\nDIC(DECIMATIO([I, II, III, IV, V, VI, VII, VIII, IX, X, XI, XII, XIII, XIV, XV, XVI, XVII, XVIII, XIX, XX]))", Program([ModuleCall("FORS")], [ExpressionStatement(BuiltIn("SEMEN", [Numeral("XLII")])), ExpressionStatement(BuiltIn("DIC", [BuiltIn("DECIMATIO", [DataArray([Numeral("I"), Numeral("II"), Numeral("III"), Numeral("IV"), Numeral("V"), Numeral("VI"), Numeral("VII"), Numeral("VIII"), Numeral("IX"), Numeral("X"), Numeral("XI"), Numeral("XII"), Numeral("XIII"), Numeral("XIV"), Numeral("XV"), Numeral("XVI"), Numeral("XVII"), Numeral("XVIII"), Numeral("XIX"), Numeral("XX")])])]))]), ValStr("[I II III IV V VI VII VIII IX X XI XIV XV XVI XVII XVIII XIX XX]"), "[I II III IV V VI VII VIII IX X XI XIV XV XVI XVII XVIII XIX XX]\n"),
|
||||
# SENATVS: majority true → VERITAS
|
||||
("SENATVS(VERITAS, VERITAS, FALSITAS)", Program([], [ExpressionStatement(BuiltIn("SENATVS", [Bool(True), Bool(True), Bool(False)]))]), ValBool(True)),
|
||||
# SENATVS: majority false → FALSITAS
|
||||
@@ -724,6 +726,14 @@ error_tests = [
|
||||
('SVBSTITVE("a", I, "c")', CentvrionError), # SVBSTITVE requires strings, not int replacement
|
||||
('SVBSTITVE("a", "b", I)', CentvrionError), # SVBSTITVE requires strings, not int text
|
||||
('SVBSTITVE("[", "b", "c")', CentvrionError), # SVBSTITVE invalid regex
|
||||
('PETE("http://example.com")', CentvrionError), # RETE required for PETE
|
||||
('CVM RETE\nPETE(I)', CentvrionError), # PETE requires a string URL
|
||||
('PETITVR("/", FVNCTIO (r) VT {\nREDI("hi")\n})', CentvrionError), # RETE required for PETITVR
|
||||
('CVM RETE\nPETITVR(I, FVNCTIO (r) VT {\nREDI("hi")\n})', CentvrionError), # PETITVR path must be string
|
||||
('CVM RETE\nPETITVR("/", "not a func")', CentvrionError), # PETITVR handler must be function
|
||||
('CVM RETE\nAVSCVLTA(LXXX)', CentvrionError), # AVSCVLTA: no routes registered
|
||||
('AVSCVLTA(LXXX)', CentvrionError), # RETE required for AVSCVLTA
|
||||
('CVM RETE\nPETITVR("/", FVNCTIO (r) VT {\nREDI("hi")\n})\nAVSCVLTA("text")', CentvrionError), # AVSCVLTA port must be integer
|
||||
]
|
||||
|
||||
class TestErrors(unittest.TestCase):
|
||||
@@ -748,7 +758,7 @@ def run_compiler_error_test(self, source):
|
||||
tmp_bin_path = tmp_bin.name
|
||||
try:
|
||||
subprocess.run(
|
||||
["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path],
|
||||
["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path, "-lcurl", "-lmicrohttpd"],
|
||||
check=True, capture_output=True,
|
||||
)
|
||||
proc = subprocess.run([tmp_bin_path], capture_output=True, text=True)
|
||||
@@ -2483,7 +2493,7 @@ class TestDormi(unittest.TestCase):
|
||||
tmp_bin_path = tmp_bin.name
|
||||
try:
|
||||
subprocess.run(
|
||||
["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path],
|
||||
["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path, "-lcurl", "-lmicrohttpd"],
|
||||
check=True, capture_output=True,
|
||||
)
|
||||
start = time.time()
|
||||
@@ -2520,7 +2530,7 @@ class TestDormi(unittest.TestCase):
|
||||
tmp_bin_path = tmp_bin.name
|
||||
try:
|
||||
subprocess.run(
|
||||
["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path],
|
||||
["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path, "-lcurl", "-lmicrohttpd"],
|
||||
check=True, capture_output=True,
|
||||
)
|
||||
start = time.time()
|
||||
@@ -2554,7 +2564,7 @@ class TestScripta(unittest.TestCase):
|
||||
tmp_bin_path = tmp_bin.name
|
||||
try:
|
||||
subprocess.run(
|
||||
["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path],
|
||||
["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path, "-lcurl", "-lmicrohttpd"],
|
||||
check=True, capture_output=True,
|
||||
)
|
||||
proc = subprocess.run([tmp_bin_path], capture_output=True, text=True)
|
||||
@@ -2797,5 +2807,165 @@ class TestTempta(unittest.TestCase):
|
||||
run_test(self, source, nodes, value, output)
|
||||
|
||||
|
||||
class TestRete(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
import http.server, threading
|
||||
class Handler(http.server.BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "text/plain")
|
||||
self.end_headers()
|
||||
self.wfile.write(b"SALVE MVNDE")
|
||||
def log_message(self, *args):
|
||||
pass
|
||||
cls.server = http.server.HTTPServer(("127.0.0.1", 0), Handler)
|
||||
cls.port = cls.server.server_address[1]
|
||||
cls.thread = threading.Thread(target=cls.server.serve_forever)
|
||||
cls.thread.daemon = True
|
||||
cls.thread.start()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.server.shutdown()
|
||||
|
||||
def test_pete(self):
|
||||
url = f"http://127.0.0.1:{self.port}/"
|
||||
source = f'CVM RETE\nPETE("{url}")'
|
||||
run_test(self, source,
|
||||
Program([ModuleCall("RETE")], [ExpressionStatement(BuiltIn("PETE", [String(url)]))]),
|
||||
ValStr("SALVE MVNDE"))
|
||||
|
||||
def test_pete_dic(self):
|
||||
url = f"http://127.0.0.1:{self.port}/"
|
||||
source = f'CVM RETE\nDIC(PETE("{url}"))'
|
||||
run_test(self, source,
|
||||
Program([ModuleCall("RETE")], [ExpressionStatement(BuiltIn("DIC", [BuiltIn("PETE", [String(url)])]))]),
|
||||
ValStr("SALVE MVNDE"), "SALVE MVNDE\n")
|
||||
|
||||
|
||||
class TestReteServer(unittest.TestCase):
|
||||
"""Integration tests for PETITVR + AVSCVLTA server functionality."""
|
||||
|
||||
def _wait_for_server(self, port, timeout=2.0):
|
||||
"""Poll until the server is accepting connections."""
|
||||
import socket
|
||||
deadline = time.time() + timeout
|
||||
while time.time() < deadline:
|
||||
try:
|
||||
with socket.create_connection(("127.0.0.1", port), timeout=0.1):
|
||||
return
|
||||
except OSError:
|
||||
time.sleep(0.05)
|
||||
self.fail(f"Server on port {port} did not start within {timeout}s")
|
||||
|
||||
def _free_port(self):
|
||||
"""Find a free port in range 1024-3999 (representable without MAGNVM)."""
|
||||
import socket, random
|
||||
for _ in range(100):
|
||||
port = random.randint(1024, 3999)
|
||||
try:
|
||||
with socket.socket() as s:
|
||||
s.bind(("127.0.0.1", port))
|
||||
return port
|
||||
except OSError:
|
||||
continue
|
||||
raise RuntimeError("Could not find a free port in range 1024-3999")
|
||||
|
||||
def _run_server(self, source):
|
||||
"""Parse and eval source in a daemon thread. Returns when server is ready."""
|
||||
import threading
|
||||
lexer = Lexer().get_lexer()
|
||||
tokens = lexer.lex(source + "\n")
|
||||
program = Parser().parse(tokens)
|
||||
t = threading.Thread(target=program.eval, daemon=True)
|
||||
t.start()
|
||||
|
||||
def test_basic_handler(self):
|
||||
port = self._free_port()
|
||||
source = (
|
||||
f'CVM RETE\n'
|
||||
f'PETITVR("/", FVNCTIO (petitio) VT {{\n'
|
||||
f'REDI("SALVE MVNDE")\n'
|
||||
f'}})\n'
|
||||
f'AVSCVLTA({int_to_num(port, False)})'
|
||||
)
|
||||
self._run_server(source)
|
||||
self._wait_for_server(port)
|
||||
import urllib.request
|
||||
resp = urllib.request.urlopen(f"http://127.0.0.1:{port}/")
|
||||
self.assertEqual(resp.read().decode(), "SALVE MVNDE")
|
||||
|
||||
def test_multiple_routes(self):
|
||||
port = self._free_port()
|
||||
source = (
|
||||
f'CVM RETE\n'
|
||||
f'PETITVR("/", FVNCTIO (p) VT {{\nREDI("RADIX")\n}})\n'
|
||||
f'PETITVR("/nomen", FVNCTIO (p) VT {{\nREDI("MARCVS")\n}})\n'
|
||||
f'AVSCVLTA({int_to_num(port, False)})'
|
||||
)
|
||||
self._run_server(source)
|
||||
self._wait_for_server(port)
|
||||
import urllib.request
|
||||
resp1 = urllib.request.urlopen(f"http://127.0.0.1:{port}/")
|
||||
self.assertEqual(resp1.read().decode(), "RADIX")
|
||||
resp2 = urllib.request.urlopen(f"http://127.0.0.1:{port}/nomen")
|
||||
self.assertEqual(resp2.read().decode(), "MARCVS")
|
||||
|
||||
def test_404_unmatched(self):
|
||||
port = self._free_port()
|
||||
source = (
|
||||
f'CVM RETE\n'
|
||||
f'PETITVR("/", FVNCTIO (p) VT {{\nREDI("ok")\n}})\n'
|
||||
f'AVSCVLTA({int_to_num(port, False)})'
|
||||
)
|
||||
self._run_server(source)
|
||||
self._wait_for_server(port)
|
||||
import urllib.request, urllib.error
|
||||
with self.assertRaises(urllib.error.HTTPError) as ctx:
|
||||
urllib.request.urlopen(f"http://127.0.0.1:{port}/nonexistent")
|
||||
self.assertEqual(ctx.exception.code, 404)
|
||||
|
||||
def test_request_dict_via(self):
|
||||
port = self._free_port()
|
||||
source = (
|
||||
f'CVM RETE\n'
|
||||
f'PETITVR("/echo", FVNCTIO (petitio) VT {{\n'
|
||||
f'REDI(petitio["via"])\n'
|
||||
f'}})\n'
|
||||
f'AVSCVLTA({int_to_num(port, False)})'
|
||||
)
|
||||
self._run_server(source)
|
||||
self._wait_for_server(port)
|
||||
import urllib.request
|
||||
resp = urllib.request.urlopen(f"http://127.0.0.1:{port}/echo")
|
||||
self.assertEqual(resp.read().decode(), "/echo")
|
||||
|
||||
def test_request_dict_quaestio(self):
|
||||
port = self._free_port()
|
||||
source = (
|
||||
f'CVM RETE\n'
|
||||
f'PETITVR("/q", FVNCTIO (petitio) VT {{\n'
|
||||
f'REDI(petitio["quaestio"])\n'
|
||||
f'}})\n'
|
||||
f'AVSCVLTA({int_to_num(port, False)})'
|
||||
)
|
||||
self._run_server(source)
|
||||
self._wait_for_server(port)
|
||||
import urllib.request
|
||||
resp = urllib.request.urlopen(f"http://127.0.0.1:{port}/q?nomen=Marcus")
|
||||
self.assertEqual(resp.read().decode(), "nomen=Marcus")
|
||||
|
||||
def test_petitvr_stores_route(self):
|
||||
"""PETITVR alone (without AVSCVLTA) just stores a route and returns NVLLVS."""
|
||||
source = 'CVM RETE\nPETITVR("/", FVNCTIO (p) VT {\nREDI("hi")\n})'
|
||||
run_test(self, source,
|
||||
Program([ModuleCall("RETE")], [ExpressionStatement(BuiltIn("PETITVR", [
|
||||
String("/"),
|
||||
Fvnctio([ID("p")], [Redi([String("hi")])])
|
||||
]))]),
|
||||
ValNul())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user