🐐 LSP
This commit is contained in:
@@ -0,0 +1,6 @@
|
||||
"""CENTVRION language server package."""
|
||||
|
||||
def run() -> None:
|
||||
"""Start the LSP server over stdio. Blocks until stdin closes."""
|
||||
from centvrion.lsp.server import create_server
|
||||
create_server().start_io()
|
||||
@@ -0,0 +1,210 @@
|
||||
"""Pure analysis helpers over the CENTVRION AST.
|
||||
|
||||
Functions here take source text and zero-based (line, character) LSP positions,
|
||||
and return plain Python values. No pygls types. The server layer adapts these
|
||||
to LSP messages.
|
||||
"""
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterator, Optional, Union
|
||||
|
||||
from rply.errors import LexingError
|
||||
|
||||
from centvrion import ast_nodes
|
||||
from centvrion.ast_nodes import (
|
||||
Defini, Designa, DesignaDestructure, DesignaIndex, Fvnctio, ID, Invoca,
|
||||
Program,
|
||||
)
|
||||
from centvrion.lexer import Lexer
|
||||
from centvrion.parser import Parser
|
||||
|
||||
|
||||
@dataclass
|
||||
class ParseFailure:
|
||||
"""Lex or parse error with source position.
|
||||
|
||||
line/character are zero-based (LSP convention). length is the number of
|
||||
characters the squiggle should cover (always >= 1).
|
||||
"""
|
||||
line: int
|
||||
character: int
|
||||
length: int
|
||||
message: str
|
||||
|
||||
|
||||
def parse(source: str) -> Union[Program, ParseFailure]:
|
||||
lexer = Lexer().get_lexer()
|
||||
try:
|
||||
tokens = lexer.lex(source + "\n")
|
||||
program = Parser().parse(tokens)
|
||||
except LexingError as e:
|
||||
# rply's colno is sometimes off by one for the failing character, but idx
|
||||
# is reliable, so compute (line, col) from idx.
|
||||
line, character = _idx_to_line_col(source, e.source_pos.idx)
|
||||
bad = source[e.source_pos.idx] if e.source_pos.idx < len(source) else "?"
|
||||
return ParseFailure(
|
||||
line=line,
|
||||
character=character,
|
||||
length=1,
|
||||
message=f"Invalid character {bad!r}",
|
||||
)
|
||||
except SyntaxError as e:
|
||||
line, character = _extract_pos(str(e))
|
||||
return ParseFailure(
|
||||
line=line,
|
||||
character=character,
|
||||
length=1,
|
||||
message=str(e),
|
||||
)
|
||||
if not isinstance(program, Program):
|
||||
return ParseFailure(0, 0, 1, "Parser did not return a Program")
|
||||
return program
|
||||
|
||||
|
||||
def _idx_to_line_col(source: str, idx: int) -> tuple[int, int]:
|
||||
if idx < 0:
|
||||
return 0, 0
|
||||
prefix = source[:idx]
|
||||
line = prefix.count("\n")
|
||||
last_nl = prefix.rfind("\n")
|
||||
character = idx - (last_nl + 1) if last_nl >= 0 else idx
|
||||
return line, character
|
||||
|
||||
|
||||
def _extract_pos(msg: str) -> tuple[int, int]:
|
||||
"""Pull (line, col) out of the parser's 'at line N, column M' error format.
|
||||
|
||||
The CENTVRION parser's error handler (parser.py:476) builds messages like
|
||||
'Unexpected token KEYWORD_X at line 3, column 5'. Falls back to (0, 0).
|
||||
"""
|
||||
import re
|
||||
m = re.search(r"line (\d+), column (\d+)", msg)
|
||||
if m:
|
||||
return max(int(m.group(1)) - 1, 0), max(int(m.group(2)) - 1, 0)
|
||||
return 0, 0
|
||||
|
||||
|
||||
def walk(node) -> Iterator:
|
||||
"""Yield every AST node reachable from `node`, including `node` itself.
|
||||
|
||||
Traverses any attribute that is a Node, ast_nodes.Program, list of those, or
|
||||
list of (key, value) pairs. Sufficient for every concrete node in
|
||||
ast_nodes.py.
|
||||
"""
|
||||
if node is None:
|
||||
return
|
||||
yield node
|
||||
for value in vars(node).values():
|
||||
yield from _walk_value(value)
|
||||
|
||||
|
||||
def _walk_value(value):
|
||||
if value is None:
|
||||
return
|
||||
if isinstance(value, (ast_nodes.Node, Program)):
|
||||
yield from walk(value)
|
||||
elif isinstance(value, list):
|
||||
for item in value:
|
||||
if isinstance(item, tuple):
|
||||
for sub in item:
|
||||
yield from _walk_value(sub)
|
||||
else:
|
||||
yield from _walk_value(item)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Definition:
|
||||
"""A symbol definition with its source position (zero-based)."""
|
||||
name: str
|
||||
line: int
|
||||
character: int
|
||||
kind: str # "function" or "variable"
|
||||
|
||||
|
||||
def collect_definitions(program: Program) -> dict[str, Definition]:
|
||||
"""Return name -> Definition for every Defini and Designa-style assignment.
|
||||
|
||||
Functions take precedence over variables when names collide; among
|
||||
variables, the first assignment wins (CENTVRION re-assigns with the same
|
||||
DESIGNA keyword, but go-to-def should land on the introducing site).
|
||||
"""
|
||||
funcs: dict[str, Definition] = {}
|
||||
vars_: dict[str, Definition] = {}
|
||||
for node in walk(program):
|
||||
if isinstance(node, Defini) and node.name.pos is not None:
|
||||
line, character = _to_zero_based(node.name.pos)
|
||||
funcs.setdefault(node.name.name, Definition(
|
||||
name=node.name.name, line=line, character=character, kind="function",
|
||||
))
|
||||
elif isinstance(node, (Designa, DesignaIndex)) and node.id.pos is not None:
|
||||
line, character = _to_zero_based(node.id.pos)
|
||||
vars_.setdefault(node.id.name, Definition(
|
||||
name=node.id.name, line=line, character=character, kind="variable",
|
||||
))
|
||||
elif isinstance(node, DesignaDestructure):
|
||||
for id_node in node.ids:
|
||||
if id_node.pos is not None:
|
||||
line, character = _to_zero_based(id_node.pos)
|
||||
vars_.setdefault(id_node.name, Definition(
|
||||
name=id_node.name, line=line, character=character, kind="variable",
|
||||
))
|
||||
out = dict(vars_)
|
||||
out.update(funcs)
|
||||
return out
|
||||
|
||||
|
||||
def definition_at(source: str, line: int, character: int) -> Optional[Definition]:
|
||||
"""Resolve an identifier at (line, character) to its definition site.
|
||||
|
||||
Re-lexes (so this still works while the file fails to parse for unrelated
|
||||
reasons), finds an ID token under the cursor, then looks it up against the
|
||||
AST's definition table. Returns None if no identifier is at that position,
|
||||
if parsing fails outright, or if the name has no definition.
|
||||
"""
|
||||
token = token_at(source, line, character)
|
||||
if token is None or token.gettokentype() != "ID":
|
||||
return None
|
||||
result = parse(source)
|
||||
if isinstance(result, ParseFailure):
|
||||
return None
|
||||
return collect_definitions(result).get(token.getstr())
|
||||
|
||||
|
||||
_WORD_TOKENS = ("ID", "DATA_NUMERAL", "BUILTIN", "MODULE")
|
||||
|
||||
|
||||
def token_at(source: str, line: int, character: int):
|
||||
"""Return the rply Token under the cursor at (line, character), or None.
|
||||
|
||||
Cursor position N (0-based) sits between characters N-1 and N, so a token
|
||||
covering character columns [start, end) matches for any cursor in
|
||||
[start, end] (end-inclusive). When two tokens touch the cursor at their
|
||||
shared boundary (e.g. cursor between `digits` and `)`), prefer the
|
||||
identifier-like one so go-to-def behaves the way users expect from other
|
||||
editors. Tolerant of LexingError: works on whatever tokens were produced
|
||||
before the failure.
|
||||
"""
|
||||
lexer = Lexer().get_lexer()
|
||||
try:
|
||||
tokens = list(lexer.lex(source + "\n"))
|
||||
except LexingError:
|
||||
return None
|
||||
target_line = line + 1
|
||||
target_col = character + 1
|
||||
candidates = []
|
||||
for tok in tokens:
|
||||
sp = tok.source_pos
|
||||
if sp is None or sp.lineno != target_line:
|
||||
continue
|
||||
start = sp.colno
|
||||
end = start + len(tok.getstr())
|
||||
if start <= target_col <= end:
|
||||
candidates.append(tok)
|
||||
for tok in candidates:
|
||||
if tok.gettokentype() in _WORD_TOKENS:
|
||||
return tok
|
||||
return candidates[0] if candidates else None
|
||||
|
||||
|
||||
def _to_zero_based(pos: tuple[int, int]) -> tuple[int, int]:
|
||||
"""rply (lineno, colno) is 1-based; LSP wants 0-based (line, character)."""
|
||||
return pos[0] - 1, pos[1] - 1
|
||||
@@ -0,0 +1,21 @@
|
||||
"""Convert CENTVRION parse failures into LSP-style diagnostic dicts.
|
||||
|
||||
Returns a list of plain dicts so the server layer can map them to
|
||||
lsprotocol types without this module importing pygls. Runtime errors
|
||||
are explicitly out of scope: we only catch lex and parse failures.
|
||||
"""
|
||||
from centvrion.lsp.analysis import ParseFailure, parse
|
||||
|
||||
|
||||
def build_diagnostics(source: str) -> list[dict]:
|
||||
result = parse(source)
|
||||
if not isinstance(result, ParseFailure):
|
||||
return []
|
||||
return [{
|
||||
"line": result.line,
|
||||
"character": result.character,
|
||||
"length": result.length,
|
||||
"message": result.message,
|
||||
"severity": 1, # Error
|
||||
"source": "centvrion",
|
||||
}]
|
||||
@@ -0,0 +1,38 @@
|
||||
"""Hover information for the LSP server.
|
||||
|
||||
Currently only handles Roman numeral literals: hovering a DATA_NUMERAL token
|
||||
shows its decimal value. The MAGNVM / SVBNVLLA module gating is bypassed for
|
||||
display purposes; if the user has written it, we'll show the decimal.
|
||||
"""
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from centvrion.ast_nodes import num_to_int
|
||||
from centvrion.errors import CentvrionError
|
||||
|
||||
from centvrion.lsp.analysis import token_at
|
||||
|
||||
|
||||
@dataclass
|
||||
class HoverInfo:
|
||||
text: str
|
||||
line: int
|
||||
start_character: int
|
||||
end_character: int
|
||||
|
||||
|
||||
def hover_at(source: str, line: int, character: int) -> Optional[HoverInfo]:
|
||||
token = token_at(source, line, character)
|
||||
if token is None or token.gettokentype() != "DATA_NUMERAL":
|
||||
return None
|
||||
try:
|
||||
value = num_to_int(token.getstr(), m=True, s=True)
|
||||
except CentvrionError:
|
||||
return None
|
||||
start_col = token.source_pos.colno - 1
|
||||
return HoverInfo(
|
||||
text=f"`{token.getstr()}` = **{value}**",
|
||||
line=line,
|
||||
start_character=start_col,
|
||||
end_character=start_col + len(token.getstr()),
|
||||
)
|
||||
@@ -0,0 +1,89 @@
|
||||
"""pygls language server for CENTVRION.
|
||||
|
||||
Thin adapter: protocol handlers convert LSP params to source + position,
|
||||
delegate to the pure analysis/hover/diagnostics modules, and convert results
|
||||
back to lsprotocol types.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
import lsprotocol.types as t
|
||||
from pygls.lsp.server import LanguageServer
|
||||
|
||||
from centvrion.lsp import analysis, diagnostics, hover
|
||||
|
||||
|
||||
_DEBOUNCE_SECONDS = 0.15
|
||||
_pending: dict[str, asyncio.Task] = {}
|
||||
|
||||
|
||||
def create_server() -> LanguageServer:
|
||||
ls = LanguageServer("centvrion-lsp", "0.1.0")
|
||||
|
||||
@ls.feature(t.TEXT_DOCUMENT_DID_OPEN)
|
||||
def did_open(params: t.DidOpenTextDocumentParams):
|
||||
_publish(ls, params.text_document.uri)
|
||||
|
||||
@ls.feature(t.TEXT_DOCUMENT_DID_CHANGE)
|
||||
def did_change(params: t.DidChangeTextDocumentParams):
|
||||
uri = params.text_document.uri
|
||||
task = _pending.pop(uri, None)
|
||||
if task is not None and not task.done():
|
||||
task.cancel()
|
||||
_pending[uri] = asyncio.get_event_loop().create_task(_debounced_publish(ls, uri))
|
||||
|
||||
@ls.feature(t.TEXT_DOCUMENT_HOVER)
|
||||
def on_hover(params: t.HoverParams):
|
||||
doc = ls.workspace.get_text_document(params.text_document.uri)
|
||||
info = hover.hover_at(doc.source, params.position.line, params.position.character)
|
||||
if info is None:
|
||||
return None
|
||||
return t.Hover(
|
||||
contents=t.MarkupContent(kind=t.MarkupKind.Markdown, value=info.text),
|
||||
range=t.Range(
|
||||
start=t.Position(line=info.line, character=info.start_character),
|
||||
end=t.Position(line=info.line, character=info.end_character),
|
||||
),
|
||||
)
|
||||
|
||||
@ls.feature(t.TEXT_DOCUMENT_DEFINITION)
|
||||
def on_definition(params: t.DefinitionParams):
|
||||
doc = ls.workspace.get_text_document(params.text_document.uri)
|
||||
defn = analysis.definition_at(doc.source, params.position.line, params.position.character)
|
||||
if defn is None:
|
||||
return None
|
||||
return t.Location(
|
||||
uri=params.text_document.uri,
|
||||
range=t.Range(
|
||||
start=t.Position(line=defn.line, character=defn.character),
|
||||
end=t.Position(line=defn.line, character=defn.character + len(defn.name)),
|
||||
),
|
||||
)
|
||||
|
||||
return ls
|
||||
|
||||
|
||||
async def _debounced_publish(ls: LanguageServer, uri: str) -> None:
|
||||
try:
|
||||
await asyncio.sleep(_DEBOUNCE_SECONDS)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
_publish(ls, uri)
|
||||
|
||||
|
||||
def _publish(ls: LanguageServer, uri: str) -> None:
|
||||
doc = ls.workspace.get_text_document(uri)
|
||||
diags = [
|
||||
t.Diagnostic(
|
||||
range=t.Range(
|
||||
start=t.Position(line=d["line"], character=d["character"]),
|
||||
end=t.Position(line=d["line"], character=d["character"] + d["length"]),
|
||||
),
|
||||
message=d["message"],
|
||||
severity=t.DiagnosticSeverity(d["severity"]),
|
||||
source=d["source"],
|
||||
)
|
||||
for d in diagnostics.build_diagnostics(doc.source)
|
||||
]
|
||||
ls.text_document_publish_diagnostics(
|
||||
t.PublishDiagnosticsParams(uri=uri, diagnostics=diags)
|
||||
)
|
||||
Reference in New Issue
Block a user