Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions modelscan/scanners/h5/scan.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import List, Optional, Dict, Any
from typing import List, Optional, Dict, Any, Set


try:
Expand All @@ -22,6 +22,13 @@

logger = logging.getLogger("modelscan")

# Keras-internal module prefixes that are safe to import on model load.
_SAFE_KERAS_MODULE_PREFIXES = (
"keras",
"tensorflow",
"tf_keras",
)


class H5LambdaDetectScan(SavedModelLambdaDetectScan):
def scan(
Expand Down Expand Up @@ -124,10 +131,65 @@ def _get_keras_h5_operator_names(self, model: Model) -> Optional[List[Any]]:
)
return ["JSONDecodeError"]

operators: List[Any] = []

if lambda_layers:
return ["Lambda"] * len(lambda_layers)
operators.extend(["Lambda"] * len(lambda_layers))

# Lambda layers are not the only code-execution path in an H5
# model_config. The config tree uses module/class_name pairs throughout
# (initializers, regularizers, constraints, dtype policies, custom
# layers) which Keras resolves via importlib on load. None of these were
# inspected, so a non-Keras module reference (e.g. builtins.exec hidden
# in a kernel_initializer) was reported as "0 issues" — a false
# negative. Recurse the whole config tree and flag any reference outside
# the Keras/TensorFlow namespace.
for module_ref in self._extract_unsafe_modules(model_config):
operators.append(f"UnsafeModule:{module_ref}")

return operators

@staticmethod
def _extract_unsafe_modules(
config: Any, visited: Optional[Set[int]] = None
) -> List[str]:
"""Recursively collect non-Keras module references from a config tree.

Returns a list of ``"<module>.<class_name>"`` strings for every dict in
the tree whose ``module`` field falls outside the safe Keras/TensorFlow
namespace. Cycles are guarded via an id() visited-set.
"""
if visited is None:
visited = set()

obj_id = id(config)
if obj_id in visited:
return []
visited.add(obj_id)

unsafe: List[str] = []

if isinstance(config, dict):
module = config.get("module")
if isinstance(module, str) and module:
if not module.startswith(_SAFE_KERAS_MODULE_PREFIXES):
class_name = config.get("class_name", "unknown")
unsafe.append(f"{module}.{class_name}")

for value in config.values():
if isinstance(value, (dict, list)):
unsafe.extend(
H5LambdaDetectScan._extract_unsafe_modules(value, visited)
)

elif isinstance(config, list):
for item in config:
if isinstance(item, (dict, list)):
unsafe.extend(
H5LambdaDetectScan._extract_unsafe_modules(item, visited)
)

return []
return unsafe

def handle_binary_dependencies(
self, settings: Optional[Dict[str, Any]] = None
Expand Down
110 changes: 110 additions & 0 deletions tests/test_h5_nested_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Regression tests for the H5 model_config nested-module-reference bypass.

Before the fix, ``H5LambdaDetectScan`` only inspected top-level layers for
``class_name == "Lambda"``. An attacker-controlled module reference hidden in a
nested config object (e.g. a ``kernel_initializer`` with
``module="builtins", class_name="exec"``) was reported as "0 issues" — a true
false negative — even though Keras resolves that module via importlib when the
model is loaded with ``tf.keras.models.load_model(..., safe_mode=False)``.
"""

import json
from pathlib import Path
from typing import Any, Dict

import pytest

h5py = pytest.importorskip("h5py")

from modelscan.modelscan import ModelScan # noqa: E402
from modelscan.issues import IssueCode # noqa: E402


def _write_h5_with_model_config(path: Path, model_config: Dict[str, Any]) -> None:
with h5py.File(path, "w") as f:
f.attrs["model_config"] = json.dumps(model_config)


def _nested_malicious_config() -> Dict[str, Any]:
# A standard Sequential model with no Lambda layer, but a Dense layer whose
# kernel_initializer references a non-Keras module. This is the exact shape
# the bypass exploited.
return {
"class_name": "Sequential",
"config": {
"name": "sequential",
"layers": [
{
"class_name": "Dense",
"config": {
"name": "dense",
"units": 8,
"kernel_initializer": {
"module": "builtins",
"class_name": "exec",
"config": {"code": "print('pwned')"},
"registered_name": "exec",
},
},
}
],
},
}


def _benign_config() -> Dict[str, Any]:
return {
"class_name": "Sequential",
"config": {
"name": "sequential",
"layers": [
{
"class_name": "Dense",
"config": {
"name": "dense",
"units": 8,
"kernel_initializer": {
"module": "keras.initializers",
"class_name": "GlorotUniform",
"config": {"seed": None},
"registered_name": None,
},
},
}
],
},
}


def test_h5_nested_unsafe_module_detected(tmp_path: Path) -> None:
malicious = tmp_path / "malicious.h5"
_write_h5_with_model_config(malicious, _nested_malicious_config())

ms = ModelScan()
ms.scan(malicious)

# The file must be scanned (not skipped) and the nested module flagged.
unsafe_ops = [
issue
for issue in ms.issues.all_issues
if issue.code == IssueCode.UNSAFE_OPERATOR
]
assert unsafe_ops, "nested unsafe module reference was not detected (false negative)"
assert any(
"builtins.exec" in issue.details.operator for issue in unsafe_ops
), "the builtins.exec reference should appear in the flagged operator"


def test_h5_benign_keras_module_not_flagged(tmp_path: Path) -> None:
benign = tmp_path / "benign.h5"
_write_h5_with_model_config(benign, _benign_config())

ms = ModelScan()
ms.scan(benign)

unsafe_ops = [
issue
for issue in ms.issues.all_issues
if issue.code == IssueCode.UNSAFE_OPERATOR
]
assert not unsafe_ops, "a standard keras.initializers reference must not be flagged"