Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 55 additions & 10 deletions python_files/tests/pytestadapter/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from typing import Any, Dict, List, Optional, Tuple

if sys.platform == "win32":
from namedpipe import NPopen
from namedpipe import NPopen # pylint: disable=import-error # cspell: disable-line


script_dir = pathlib.Path(__file__).parent.parent.parent
Expand Down Expand Up @@ -54,7 +54,7 @@ def create_symlink(root: pathlib.Path, target_ext: str, destination_ext: str):
print("destination already exists", destination)
try:
destination.symlink_to(target)
except Exception as e:
except OSError as e:
print("error occurred when attempting to create a symlink", e)
yield target, destination
finally:
Expand Down Expand Up @@ -82,12 +82,57 @@ def process_data_received(data: str) -> List[Dict[str, Any]]:
elif json_data["jsonrpc"] != "2.0":
raise ValueError("Invalid JSON-RPC version received, not version 2.0")
else:
json_messages.append(json_data["params"])
json_messages.append(expand_compact_discovery_payload(json_data["params"]))

return json_messages # return the list of json messages


def parse_rpc_message(data: str) -> Tuple[Dict[str, str], str]:
def expand_path(path_value: str, path_base: str) -> str:
if not path_base or pathlib.Path(path_value).is_absolute():
return path_value
if path_value == ".":
return path_base
return os.fspath(pathlib.Path(path_base, path_value))


def expand_test_id(test_id: str, id_base: str) -> str:
test_path, separator, selector = test_id.partition("::")
expanded_test_path = expand_path(test_path, id_base)
return f"{expanded_test_path}{separator}{selector}" if separator else expanded_test_path


def expand_compact_discovery_node(
test_node: Dict[str, Any] | None, path_base: str, id_base: str
) -> Dict[str, Any] | None:
if test_node is None:
return None
expanded_node = dict(test_node)
if "path" in expanded_node:
expanded_node["path"] = expand_path(expanded_node["path"], path_base)
if "id_" in expanded_node:
expanded_node["id_"] = expand_test_id(expanded_node["id_"], id_base)
if "runID" in expanded_node:
expanded_node["runID"] = expand_test_id(expanded_node["runID"], id_base)
if "children" in expanded_node:
expanded_node["children"] = [
expand_compact_discovery_node(child, path_base, id_base)
for child in expanded_node["children"]
]
return expanded_node


def expand_compact_discovery_payload(payload: Dict[str, Any]) -> Dict[str, Any]:
path_base = payload.get("pathBase")
if not path_base or "tests" not in payload or payload["tests"] is None:
return payload

expanded_payload = dict(payload)
id_base = payload.get("idBase", path_base)
expanded_payload["tests"] = expand_compact_discovery_node(payload["tests"], path_base, id_base)
return expanded_payload


def parse_rpc_message(data: str) -> Tuple[Dict[str, Any], str]:
"""Process the JSON data which comes from the server.

A single rpc payload is in the format:
Expand Down Expand Up @@ -122,7 +167,7 @@ def parse_rpc_message(data: str) -> Tuple[Dict[str, str], str]:
line: str = str_stream.readline(length)
try:
# try to parse the json, if successful it is single payload so return with remaining data
json_data: dict[str, str] = json.loads(line)
json_data: dict[str, Any] = json.loads(line)
return json_data, str_stream.read()
except json.JSONDecodeError:
print("json decode error")
Expand All @@ -131,7 +176,7 @@ def parse_rpc_message(data: str) -> Tuple[Dict[str, str], str]:
def _listen_on_fifo(pipe_name: str, result: List[str], completed: threading.Event):
# Open the FIFO for reading
fifo_path = pathlib.Path(pipe_name)
with fifo_path.open() as fifo:
with fifo_path.open(encoding="utf-8") as fifo:
print("Waiting for data...")
while True:
if completed.is_set():
Expand Down Expand Up @@ -198,7 +243,7 @@ def _listen_on_pipe_new(listener, result: List[str], completed: threading.Event)


def _run_test_code(proc_args: List[str], proc_env, proc_cwd: str, completed: threading.Event):
result = subprocess.run(proc_args, env=proc_env, cwd=proc_cwd)
result = subprocess.run(proc_args, env=proc_env, cwd=proc_cwd, check=False)
completed.set()
return result

Expand Down Expand Up @@ -257,7 +302,7 @@ def runner_with_cwd_env(
)
env_add.update({"RUN_TEST_IDS_PIPE": test_ids_pipe})
test_ids_arr = after_ids
with open(test_ids_pipe, "w") as f: # noqa: PTH123
with open(test_ids_pipe, "w", encoding="utf-8") as f: # noqa: PTH123
f.write("\n".join(test_ids_arr))
else:
process_args = [sys.executable, "-m", "pytest", "-p", "vscode_pytest", "-s", *args]
Expand Down Expand Up @@ -379,7 +424,7 @@ def find_test_line_number(test_name: str, test_file_path) -> str:
test_file_path: The path to the test file where the test is located.
"""
test_file_unique_id: str = "test_marker--" + test_name.split("[")[0]
with open(test_file_path) as f: # noqa: PTH123
with open(test_file_path, encoding="utf-8") as f: # noqa: PTH123
for i, line in enumerate(f):
if test_file_unique_id in line:
return str(i + 1)
Expand All @@ -395,7 +440,7 @@ def find_class_line_number(class_name: str, test_file_path) -> str:
test_file_path: The path to the test file where the class is located.
"""
# Look for the class definition line (or function for pytest-describe)
with open(test_file_path) as f: # noqa: PTH123
with open(test_file_path, encoding="utf-8") as f: # noqa: PTH123
for i, line in enumerate(f):
# Match "class ClassName" or "class ClassName(" or "class ClassName:"
# Also match "def ClassName(" for pytest-describe blocks
Expand Down
132 changes: 131 additions & 1 deletion python_files/tests/pytestadapter/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,146 @@
# Licensed under the MIT License.
import json
import os
import pathlib
import sys
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, cast

import pytest

import vscode_pytest
from tests.tree_comparison_helper import is_same_tree

from . import expected_discovery_test_output, helpers


def test_compact_discovery_payload_keeps_absolute_tree_until_return(tmp_path, monkeypatch):
monkeypatch.setattr(vscode_pytest, "ERRORS", [])
base_path = tmp_path / "workspace"
test_file = base_path / "tests" / "test_sample.py"
absolute_test_id = f"{os.fspath(test_file)}::test_case[param]"
session_node: vscode_pytest.TestNode = {
"name": "workspace",
"path": base_path,
"type_": "folder",
"id_": os.fspath(base_path),
"children": [
{
"name": "test_sample.py",
"path": test_file,
"type_": "file",
"id_": os.fspath(test_file),
"children": [
{
"name": "test_case[param]",
"path": test_file,
"type_": "test",
"id_": absolute_test_id,
"runID": absolute_test_id,
"lineno": "7",
}
],
}
],
}

payload = vscode_pytest.create_compact_discovery_payload(os.fspath(base_path), session_node)

assert session_node["path"] == base_path
file_node = cast("vscode_pytest.TestNode", session_node["children"][0])
assert file_node is not None
assert file_node["path"] == test_file
test_node = cast("vscode_pytest.TestItem", file_node["children"][0])
assert test_node is not None
assert test_node["id_"] == absolute_test_id

assert payload["pathBase"] == os.fspath(base_path)
assert payload["idBase"] == os.fspath(base_path)
assert payload["tests"] is not None
compact_tests = cast("Dict[str, Any]", payload["tests"])
assert compact_tests["path"] == "."
assert compact_tests["id_"] == "."
compact_file_node = cast("Dict[str, Any]", compact_tests["children"][0])
assert compact_file_node["path"] == os.fspath(pathlib.Path("tests", "test_sample.py"))
assert compact_file_node["id_"] == os.fspath(pathlib.Path("tests", "test_sample.py"))
compact_test_node = cast("Dict[str, Any]", compact_file_node["children"][0])
assert compact_test_node["path"] == os.fspath(pathlib.Path("tests", "test_sample.py"))
assert (
compact_test_node["id_"]
== os.fspath(pathlib.Path("tests", "test_sample.py")) + "::test_case[param]"
)
assert (
compact_test_node["runID"]
== os.fspath(pathlib.Path("tests", "test_sample.py")) + "::test_case[param]"
)


def test_compact_discovery_payload_keeps_paths_outside_base_absolute(tmp_path):
base_path = tmp_path / "workspace"
external_file = tmp_path / "external" / "test_external.py"

assert vscode_pytest.compact_path(external_file, base_path) == os.fspath(external_file)
assert (
vscode_pytest.compact_test_id(f"{os.fspath(external_file)}::test_external", base_path)
== f"{os.fspath(external_file)}::test_external"
)


def test_compact_discovery_payload_expands_after_rpc_parsing(tmp_path):
base_path = os.fspath(tmp_path / "workspace")
payload = {
"cwd": base_path,
"status": "success",
"payloadVersion": 2,
"pathBase": base_path,
"idBase": base_path,
"tests": {
"name": "workspace",
"path": ".",
"type_": "folder",
"id_": ".",
"children": [
{
"name": "test_sample.py",
"path": "tests/test_sample.py",
"type_": "file",
"id_": "tests/test_sample.py",
"children": [
{
"name": "test_case[param]",
"path": "tests/test_sample.py",
"type_": "test",
"id_": "tests/test_sample.py::test_case[param]",
"runID": "tests/test_sample.py::test_case[param]",
"lineno": "7",
}
],
}
],
},
"error": [],
}
body = json.dumps({"jsonrpc": "2.0", "params": payload})
framed_message = f"content-length: {len(body)}\r\ncontent-type: application/json\r\n\r\n{body}"
chunked_message = "".join(
[framed_message[:13], framed_message[13:97], framed_message[97:]]
)

parsed_payload = helpers.process_data_received(chunked_message)[0]

assert parsed_payload["tests"]["path"] == base_path
parsed_file_node = parsed_payload["tests"]["children"][0]
assert parsed_file_node["path"] == os.fspath(pathlib.Path(base_path, "tests/test_sample.py"))
parsed_test_node = parsed_file_node["children"][0]
assert (
parsed_test_node["id_"]
== os.fspath(pathlib.Path(base_path, "tests/test_sample.py")) + "::test_case[param]"
)
assert (
parsed_test_node["runID"]
== os.fspath(pathlib.Path(base_path, "tests/test_sample.py")) + "::test_case[param]"
)


def test_import_error():
"""Test pytest discovery on a file that has a pytest marker but does not import pytest.

Expand Down
Loading