Skip to content
144 changes: 143 additions & 1 deletion sagemaker-core/src/sagemaker/core/telemetry/telemetry_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,151 @@
}


def _telemetry_emitter(feature: str, func_name: str):
def _classify_error(e: Exception) -> str:
"""Classify an exception into an actionable error category."""
error_type = type(e).__name__
error_msg = str(e).lower()

if "validation" in error_type.lower() or "invalid" in error_msg or "must be" in error_msg:
return "validation_error"
if "accessdenied" in error_msg or "not authorized" in error_msg or "forbidden" in error_msg:
return "auth_error"
if "capacity" in error_msg or "insufficientcapacity" in error_msg or "resourcelimitexceeded" in error_msg:
return "capacity_error"
if "timeout" in error_type.lower() or "timed out" in error_msg or "timeout" in error_msg:
return "timeout_error"
if "not found" in error_msg or "does not exist" in error_msg or "could not find" in error_msg:
return "resource_not_found"
if "eula" in error_msg or "accept_eula" in error_msg:
return "eula_error"
if "throttl" in error_msg or "rate exceeded" in error_msg or "too many requests" in error_msg:
return "throttling_error"
if "connection" in error_msg or "network" in error_msg or "unreachable" in error_msg:
return "network_error"
return "unknown"


def _attr_to_key(attr: str) -> str:
"""Convert attribute name to camelCase telemetry key.

Examples: '_model_name' -> 'modelName', 'training_type' -> 'trainingType'
"""
attr = attr.lstrip("_")
parts = attr.split("_")
return parts[0] + "".join(p.capitalize() for p in parts[1:])


class TelemetryParamType:
"""Constants for telemetry parameter extraction types.

Used in the `telemetry_params` list passed to @_telemetry_emitter decorator.
Each entry in telemetry_params is a tuple of (name, type) or (name, type, value).

To add a new telemetry signal to any class:
1. Identify what you want to track (instance attribute, method return, or kwarg).
2. Pick the appropriate type constant below.
3. Add a tuple to the `telemetry_params` list on the decorator.

Example:
@_telemetry_emitter(
feature=Feature.MODEL_CUSTOMIZATION,
func_name="MyClass.my_method",
telemetry_params=[
("model_name", TelemetryParamType.ATTR_VALUE), # emits x-modelName=<value>
("networking", TelemetryParamType.ATTR_EXISTS), # emits x-hasNetworking=true/false
("_is_fine_tuned", TelemetryParamType.ATTR_CALL), # emits x-isFineTuned=True/False
("instance_type", TelemetryParamType.KWARG_VALUE), # emits x-instanceType=<kwarg value>
("kms_key_id", TelemetryParamType.KWARG_EXISTS), # emits x-hasKmsKeyId=true/false
],
)
"""

# Reads self.<name> and emits the actual value.
# Use for: model names, training types, modes — values useful for analytics.
# Emits nothing if the attribute is None.
ATTR_VALUE = "attr_value"

# Reads self.<name> and emits true/false based on whether it's set (not None).
# Use for: sensitive configs (KMS, VPC, MLflow) where you only need to know
# if the customer configured it, without exposing the actual value.
ATTR_EXISTS = "attr_exists"

# Calls self.<name>() and emits the return value.
# Use for: computed/derived values like _is_model_customization(), _is_nova_model().
ATTR_CALL = "attr_call"

# Reads kwargs[<name>] from the decorated method's keyword arguments and emits the value.
# Use for: method parameters not stored on self (e.g., instance_type passed to deploy()).
# Emits nothing if the kwarg is None or not provided.
KWARG_VALUE = "kwarg_value"

# Reads kwargs[<name>] and emits true/false based on whether it's provided and truthy.
# Use for: optional method parameters where you only need presence info
# (e.g., update_endpoint, imported_model_kms_key_id).
KWARG_EXISTS = "kwarg_exists"


def _extract_telemetry_params(instance, kwargs, telemetry_params=None) -> str:
"""Extract telemetry params from instance/kwargs based on telemetry_params list.

Args:
instance: The class instance (args[0]).
kwargs: The kwargs dict from the decorated function call.
telemetry_params: List of tuples defining what to extract.
- ("attr_name", ATTR_VALUE) → emit self.attr value
- ("attr_name", ATTR_EXISTS) → emit true/false
- ("method_name", ATTR_CALL) → call self.method(), emit return value
- ("kwarg_name", KWARG_VALUE) → emit kwargs value
- ("kwarg_name", KWARG_EXISTS) → emit true/false

Returns:
str: URL query params string.
"""
if not telemetry_params:
return ""
parts = []
T = TelemetryParamType
for param in telemetry_params:
name, kind = param[0], param[1]
key = _attr_to_key(name)
if kind == T.ATTR_VALUE:
value = getattr(instance, name, None)
if value is not None:
parts.append(f"&x-{key}={value}")
elif kind == T.ATTR_EXISTS:
value = getattr(instance, name, None)
parts.append(f"&x-has{key[0].upper()}{key[1:]}={'true' if value else 'false'}")
elif kind == T.ATTR_CALL:
method = getattr(instance, name, None)
if callable(method):
try:
parts.append(f"&x-{key}={method()}")
except Exception:
pass
elif kind == T.KWARG_VALUE:
value = kwargs.get(name) if kwargs else None
if value is not None:
parts.append(f"&x-{key}={value}")
elif kind == T.KWARG_EXISTS:
value = kwargs.get(name) if kwargs else None
parts.append(f"&x-has{key[0].upper()}{key[1:]}={'true' if value else 'false'}")
return "".join(parts)


def _telemetry_emitter(feature: str, func_name: str, telemetry_params=None):
"""Telemetry Emitter

Decorator to emit telemetry logs for SageMaker Python SDK functions. This class needs
sagemaker_session object as a member. Default session object is a pysdk v2 Session object
in this repo. When collecting telemetry for classes using sagemaker-core Session object,
we should be aware of its differences, such as sagemaker_session.sagemaker_config does not
exist in new Session class.

Args:
feature: The Feature enum value for this telemetry event.
func_name: Human-readable function name for tracking.
telemetry_params: Optional list of tuples defining granular params to extract.
See TelemetryParamType for available types.
"""

def decorator(func):
Expand Down Expand Up @@ -175,6 +312,10 @@ def wrapper(*args, **kwargs):
if created_by:
extra += f"&x-createdBy={quote(created_by, safe='')}"

# Extract granular telemetry params from the instance
if telemetry_params and len(args) > 0:
extra += _extract_telemetry_params(args[0], kwargs, telemetry_params)

start_timer = perf_counter()
try:
# Call the original function
Expand All @@ -200,6 +341,7 @@ def wrapper(*args, **kwargs):
stop_timer = perf_counter()
elapsed = stop_timer - start_timer
extra += f"&x-latency={round(elapsed, 2)}"
extra += f"&x-errorCategory={_classify_error(e)}"
if not telemetry_opt_out_flag:
_send_telemetry_request(
STATUS_TO_CODE[str(Status.FAILURE)],
Expand Down
Loading
Loading