from tests._helpers import ( unittest, parameterized, Fraction, time, run_test, run_compiler_error_test, ArrayIndex, ArraySlice, Bool, BinOp, BuiltIn, DataArray, DataDict, DataRangeArray, Defini, Continva, Designa, DesignaDestructure, DesignaIndex, DumStatement, Erumpe, ExpressionStatement, Fvnctio, ID, InterpolatedString, Invoca, ModuleCall, Nullus, Numeral, PerStatement, Program, Redi, SiStatement, String, TemptaStatement, UnaryMinus, UnaryNot, Fractio, frac_to_fraction, fraction_to_frac, num_to_int, int_to_num, make_string, ValInt, ValStr, ValBool, ValList, ValDict, ValNul, ValFunc, ValFrac, CentvrionError, _RUNTIME_C, _cent_rng, Lexer, Parser, compile_program, os, subprocess, tempfile, StringIO, patch, ) # --- DORMI --- dormi_tests = [ ("DORMI(NVLLVS)", Program([], [ExpressionStatement(BuiltIn("DORMI", [Nullus()]))]), ValNul()), ] class TestDormi(unittest.TestCase): @parameterized.expand(dormi_tests) def test_dormi(self, source, nodes, value, output=""): run_test(self, source, nodes, value, output) def test_dormi_timing_int(self): source = "DORMI(I)\n" lexer = Lexer().get_lexer() tokens = lexer.lex(source) program = Parser().parse(tokens) start = time.time() program.eval() elapsed = time.time() - start self.assertAlmostEqual(elapsed, 1.0, delta=0.5) def test_dormi_timing_int_compiled(self): source = "DORMI(I)\n" lexer = Lexer().get_lexer() tokens = lexer.lex(source) program = Parser().parse(tokens) c_source = compile_program(program) with tempfile.NamedTemporaryFile(suffix=".c", delete=False, mode="w") as tmp_c: tmp_c.write(c_source) tmp_c_path = tmp_c.name with tempfile.NamedTemporaryFile(suffix="", delete=False) as tmp_bin: tmp_bin_path = tmp_bin.name try: subprocess.run( ["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path, "-lcurl", "-lmicrohttpd"], check=True, capture_output=True, ) start = time.time() proc = subprocess.run([tmp_bin_path], capture_output=True, text=True) elapsed = time.time() - start self.assertEqual(proc.returncode, 0) self.assertAlmostEqual(elapsed, 1.0, delta=0.5) finally: os.unlink(tmp_c_path) os.unlink(tmp_bin_path) def test_dormi_timing_frac(self): source = "CVM FRACTIO\nDORMI(S)\n" lexer = Lexer().get_lexer() tokens = lexer.lex(source) program = Parser().parse(tokens) start = time.time() program.eval() elapsed = time.time() - start self.assertAlmostEqual(elapsed, 0.5, delta=0.5) def test_dormi_timing_frac_compiled(self): source = "CVM FRACTIO\nDORMI(S)\n" lexer = Lexer().get_lexer() tokens = lexer.lex(source) program = Parser().parse(tokens) c_source = compile_program(program) with tempfile.NamedTemporaryFile(suffix=".c", delete=False, mode="w") as tmp_c: tmp_c.write(c_source) tmp_c_path = tmp_c.name with tempfile.NamedTemporaryFile(suffix="", delete=False) as tmp_bin: tmp_bin_path = tmp_bin.name try: subprocess.run( ["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path, "-lcurl", "-lmicrohttpd"], check=True, capture_output=True, ) start = time.time() proc = subprocess.run([tmp_bin_path], capture_output=True, text=True) elapsed = time.time() - start self.assertEqual(proc.returncode, 0) self.assertAlmostEqual(elapsed, 0.5, delta=0.5) finally: os.unlink(tmp_c_path) os.unlink(tmp_bin_path) class TestScripta(unittest.TestCase): def _run_scripta(self, source): lexer = Lexer().get_lexer() tokens = lexer.lex(source + "\n") program = Parser().parse(tokens) # printer round-trip new_text = program.print() new_tokens = Lexer().get_lexer().lex(new_text + "\n") new_nodes = Parser().parse(new_tokens) self.assertEqual(program, new_nodes, f"Printer test\n{source}\n{new_text}") return program def _compile_and_run(self, program): c_source = compile_program(program) with tempfile.NamedTemporaryFile(suffix=".c", delete=False, mode="w") as tmp_c: tmp_c.write(c_source) tmp_c_path = tmp_c.name with tempfile.NamedTemporaryFile(suffix="", delete=False) as tmp_bin: tmp_bin_path = tmp_bin.name try: subprocess.run( ["gcc", "-O2", tmp_c_path, _RUNTIME_C, "-o", tmp_bin_path, "-lcurl", "-lmicrohttpd"], check=True, capture_output=True, ) proc = subprocess.run([tmp_bin_path], capture_output=True, text=True) self.assertEqual(proc.returncode, 0, f"Compiler binary exited non-zero:\n{proc.stderr}") return proc.stdout finally: os.unlink(tmp_c_path) os.unlink(tmp_bin_path) def test_scribe_and_lege(self): with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f: path = f.name try: source = f'CVM SCRIPTA\nSCRIBE("{path}", "SALVE MVNDE")\nDIC(LEGE("{path}"))' program = self._run_scripta(source) captured = StringIO() with patch("sys.stdout", captured): program.eval() self.assertEqual(captured.getvalue(), "SALVE MVNDE\n") with open(path) as f: self.assertEqual(f.read(), "SALVE MVNDE") finally: if os.path.exists(path): os.unlink(path) def test_scribe_and_lege_compiled(self): with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f: path = f.name try: source = f'CVM SCRIPTA\nSCRIBE("{path}", "SALVE MVNDE")\nDIC(LEGE("{path}"))' program = self._run_scripta(source) output = self._compile_and_run(program) self.assertEqual(output, "SALVE MVNDE\n") with open(path) as f: self.assertEqual(f.read(), "SALVE MVNDE") finally: if os.path.exists(path): os.unlink(path) def test_adivnge(self): with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f: path = f.name try: source = f'CVM SCRIPTA\nSCRIBE("{path}", "SALVE")\nADIVNGE("{path}", " MVNDE")\nDIC(LEGE("{path}"))' program = self._run_scripta(source) captured = StringIO() with patch("sys.stdout", captured): program.eval() self.assertEqual(captured.getvalue(), "SALVE MVNDE\n") finally: if os.path.exists(path): os.unlink(path) def test_adivnge_compiled(self): with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f: path = f.name try: source = f'CVM SCRIPTA\nSCRIBE("{path}", "SALVE")\nADIVNGE("{path}", " MVNDE")\nDIC(LEGE("{path}"))' program = self._run_scripta(source) output = self._compile_and_run(program) self.assertEqual(output, "SALVE MVNDE\n") finally: if os.path.exists(path): os.unlink(path) def test_scribe_overwrites(self): with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f: path = f.name try: source = f'CVM SCRIPTA\nSCRIBE("{path}", "first")\nSCRIBE("{path}", "second")\nLEGE("{path}")' program = self._run_scripta(source) result = program.eval() self.assertEqual(result, ValStr("second")) finally: if os.path.exists(path): os.unlink(path) def test_lege_empty_file(self): with tempfile.NamedTemporaryFile(suffix=".txt", delete=False, mode="w") as f: path = f.name try: source = f'CVM SCRIPTA\nLEGE("{path}")' program = self._run_scripta(source) result = program.eval() self.assertEqual(result, ValStr("")) finally: os.unlink(path) def test_lege_preexisting_content(self): with tempfile.NamedTemporaryFile(suffix=".txt", delete=False, mode="w") as f: f.write("hello from python") path = f.name try: source = f'CVM SCRIPTA\nLEGE("{path}")' program = self._run_scripta(source) result = program.eval() self.assertEqual(result, ValStr("hello from python")) finally: os.unlink(path) def test_scribe_returns_nullus(self): with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f: path = f.name try: source = f'CVM SCRIPTA\nSCRIBE("{path}", "x")' program = self._run_scripta(source) result = program.eval() self.assertEqual(result, ValNul()) finally: if os.path.exists(path): os.unlink(path) def test_adivnge_returns_nullus(self): with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f: path = f.name try: source = f'CVM SCRIPTA\nADIVNGE("{path}", "x")' program = self._run_scripta(source) result = program.eval() self.assertEqual(result, ValNul()) finally: if os.path.exists(path): os.unlink(path) class TestRete(unittest.TestCase): @classmethod def setUpClass(cls): import http.server, threading class Handler(http.server.BaseHTTPRequestHandler): def do_GET(self): self.send_response(200) self.send_header("Content-Type", "text/plain") self.end_headers() self.wfile.write(b"SALVE MVNDE") def log_message(self, *args): pass cls.server = http.server.HTTPServer(("127.0.0.1", 0), Handler) cls.port = cls.server.server_address[1] cls.thread = threading.Thread(target=cls.server.serve_forever) cls.thread.daemon = True cls.thread.start() @classmethod def tearDownClass(cls): cls.server.shutdown() def test_pete(self): url = f"http://127.0.0.1:{self.port}/" source = f'CVM RETE\nPETE("{url}")' run_test(self, source, Program([ModuleCall("RETE")], [ExpressionStatement(BuiltIn("PETE", [String(url)]))]), ValStr("SALVE MVNDE")) def test_pete_dic(self): url = f"http://127.0.0.1:{self.port}/" source = f'CVM RETE\nDIC(PETE("{url}"))' run_test(self, source, Program([ModuleCall("RETE")], [ExpressionStatement(BuiltIn("DIC", [BuiltIn("PETE", [String(url)])]))]), ValStr("SALVE MVNDE"), "SALVE MVNDE\n") class TestReteServer(unittest.TestCase): """Integration tests for PETITVR + AVSCVLTA server functionality.""" def _wait_for_server(self, port, timeout=2.0): """Poll until the server is accepting connections.""" import socket deadline = time.time() + timeout while time.time() < deadline: try: with socket.create_connection(("127.0.0.1", port), timeout=0.1): return except OSError: time.sleep(0.05) self.fail(f"Server on port {port} did not start within {timeout}s") def _free_port(self): """Find a free port in range 1024-3999 (representable without MAGNVM).""" import socket, random for _ in range(100): port = random.randint(1024, 3999) try: with socket.socket() as s: s.bind(("127.0.0.1", port)) return port except OSError: continue raise RuntimeError("Could not find a free port in range 1024-3999") def _run_server(self, source): """Parse and eval source in a daemon thread. Returns when server is ready.""" import threading lexer = Lexer().get_lexer() tokens = lexer.lex(source + "\n") program = Parser().parse(tokens) t = threading.Thread(target=program.eval, daemon=True) t.start() def test_basic_handler(self): port = self._free_port() source = ( f'CVM RETE\n' f'PETITVR("/", FVNCTIO (petitio) VT {{\n' f'REDI("SALVE MVNDE")\n' f'}})\n' f'AVSCVLTA({int_to_num(port, False)})' ) self._run_server(source) self._wait_for_server(port) import urllib.request resp = urllib.request.urlopen(f"http://127.0.0.1:{port}/") self.assertEqual(resp.read().decode(), "SALVE MVNDE") def test_multiple_routes(self): port = self._free_port() source = ( f'CVM RETE\n' f'PETITVR("/", FVNCTIO (p) VT {{\nREDI("RADIX")\n}})\n' f'PETITVR("/nomen", FVNCTIO (p) VT {{\nREDI("MARCVS")\n}})\n' f'AVSCVLTA({int_to_num(port, False)})' ) self._run_server(source) self._wait_for_server(port) import urllib.request resp1 = urllib.request.urlopen(f"http://127.0.0.1:{port}/") self.assertEqual(resp1.read().decode(), "RADIX") resp2 = urllib.request.urlopen(f"http://127.0.0.1:{port}/nomen") self.assertEqual(resp2.read().decode(), "MARCVS") def test_404_unmatched(self): port = self._free_port() source = ( f'CVM RETE\n' f'PETITVR("/", FVNCTIO (p) VT {{\nREDI("ok")\n}})\n' f'AVSCVLTA({int_to_num(port, False)})' ) self._run_server(source) self._wait_for_server(port) import urllib.request, urllib.error with self.assertRaises(urllib.error.HTTPError) as ctx: urllib.request.urlopen(f"http://127.0.0.1:{port}/nonexistent") self.assertEqual(ctx.exception.code, 404) def test_request_dict_via(self): port = self._free_port() source = ( f'CVM RETE\n' f'PETITVR("/echo", FVNCTIO (petitio) VT {{\n' f'REDI(petitio["via"])\n' f'}})\n' f'AVSCVLTA({int_to_num(port, False)})' ) self._run_server(source) self._wait_for_server(port) import urllib.request resp = urllib.request.urlopen(f"http://127.0.0.1:{port}/echo") self.assertEqual(resp.read().decode(), "/echo") def test_request_dict_quaestio(self): port = self._free_port() source = ( f'CVM RETE\n' f'PETITVR("/q", FVNCTIO (petitio) VT {{\n' f'REDI(petitio["quaestio"])\n' f'}})\n' f'AVSCVLTA({int_to_num(port, False)})' ) self._run_server(source) self._wait_for_server(port) import urllib.request resp = urllib.request.urlopen(f"http://127.0.0.1:{port}/q?nomen=Marcus") self.assertEqual(resp.read().decode(), "nomen=Marcus") def test_petitvr_stores_route(self): """PETITVR alone (without AVSCVLTA) just stores a route and returns NVLLVS.""" source = 'CVM RETE\nPETITVR("/", FVNCTIO (p) VT {\nREDI("hi")\n})' run_test(self, source, Program([ModuleCall("RETE")], [ExpressionStatement(BuiltIn("PETITVR", [ String("/"), Fvnctio([ID("p")], [Redi([String("hi")])]) ]))]), ValNul())