diff --git a/docker-compose/deeploy-testbed.local-core.yaml b/docker-compose/deeploy-testbed.local-core.yaml new file mode 100644 index 00000000..32b5b8f0 --- /dev/null +++ b/docker-compose/deeploy-testbed.local-core.yaml @@ -0,0 +1,8 @@ +services: + deeploy_test_node_1: + volumes: + - ${DEEPLOY_TESTBED_CORE_PATH:?set DEEPLOY_TESTBED_CORE_PATH to the local naeural_core package directory}:/opt/python/lib/python3.13/site-packages/naeural_core:ro + + deeploy_test_node_2: + volumes: + - ${DEEPLOY_TESTBED_CORE_PATH:?set DEEPLOY_TESTBED_CORE_PATH to the local naeural_core package directory}:/opt/python/lib/python3.13/site-packages/naeural_core:ro diff --git a/docker-compose/deeploy-testbed.yaml b/docker-compose/deeploy-testbed.yaml index 944dfdf5..100d83c7 100644 --- a/docker-compose/deeploy-testbed.yaml +++ b/docker-compose/deeploy-testbed.yaml @@ -27,7 +27,7 @@ x-edge-node-env: &edge-node-env EE_TUNNEL_ENGINE: none x-edge-node: &edge-node - image: local_edge_node_deeploy_0002 + image: local_edge_node_deeploy_testbed build: context: .. dockerfile: Dockerfile_devnet diff --git a/docker-compose/deeploy-testbed/README.md b/docker-compose/deeploy-testbed/README.md index 58454cb0..0e910f81 100644 --- a/docker-compose/deeploy-testbed/README.md +++ b/docker-compose/deeploy-testbed/README.md @@ -11,9 +11,22 @@ PYTHONPATH=/mnt/c/repos/naeural_core:/mnt/c/repos/naeural_client:. /home/bleot/v docker compose -f docker-compose/deeploy-testbed.yaml down -v ``` +When validating local `naeural_core` receiver changes before they are published, +mount the local package into both node containers: + +```bash +DEEPLOY_TESTBED_CORE_PATH=/absolute/path/to/naeural_core/naeural_core \ +docker compose -f docker-compose/deeploy-testbed.yaml -f docker-compose/deeploy-testbed.local-core.yaml up -d --build +PYTHONPATH=/absolute/path/to/naeural_core:/absolute/path/to/naeural_client:. python docker-compose/deeploy-testbed/validate_delete_workflow.py +DEEPLOY_TESTBED_CORE_PATH=/absolute/path/to/naeural_core/naeural_core \ +docker compose -f docker-compose/deeploy-testbed.yaml -f docker-compose/deeploy-testbed.local-core.yaml down -v +``` + The validation script creates one Deeploy-like multi-plugin app pipeline on each local node, calls the real `delete_pipeline_from_nodes()` path, and asserts that each node receives exactly one `DELETE_CONFIG` for the app. +It also updates the pipeline to a newer Deeploy lifecycle generation, sends an +older structured delete command, and asserts the newer saved config remains. The app uses testbed-only data/business plugins mounted into the node containers under the normal root plugin search paths. diff --git a/docker-compose/deeploy-testbed/validate_delete_workflow.py b/docker-compose/deeploy-testbed/validate_delete_workflow.py index 05b61a67..55868191 100644 --- a/docker-compose/deeploy-testbed/validate_delete_workflow.py +++ b/docker-compose/deeploy-testbed/validate_delete_workflow.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import copy import json import os import subprocess @@ -80,6 +81,10 @@ def stream_exists(container, app_id): return result.returncode == 0 +def read_stream_config(container, app_id): + return read_json_file(container, stream_config_path(app_id)) + + def wait_for_stream_state(container, app_id, expected_exists): deadline = time.time() + POLL_TIMEOUT while time.time() < deadline: @@ -90,6 +95,26 @@ def wait_for_stream_state(container, app_id, expected_exists): raise TimeoutError(f"Timed out waiting for {app_id} to {state} on {container}") +def wait_for_stream_generation(container, app_id, generation): + deadline = time.time() + POLL_TIMEOUT + last_generation = None + while time.time() < deadline: + if stream_exists(container, app_id): + try: + config = read_stream_config(container, app_id) + specs = config.get("DEEPLOY_SPECS", {}) + last_generation = specs.get(DEEPLOY_KEYS.LIFECYCLE_GENERATION) + if last_generation == generation: + return config + except Exception: + pass + time.sleep(2) + raise TimeoutError( + f"Timed out waiting for {app_id} generation {generation} on {container}; " + f"last observed generation was {last_generation}" + ) + + def list_received_command_files(container): cmd = "find /edge_node/_local_cache/_output/received_commands -type f -name '*.json' -print 2>/dev/null | sort" result = docker_exec(container, "sh", "-c", cmd, check=False) @@ -110,12 +135,28 @@ def count_delete_commands(container, app_id): data = read_json_file(container, path) except Exception: continue - if data.get("ACTION") == "DELETE_CONFIG" and data.get("PAYLOAD") == app_id: + payload = data.get("PAYLOAD") + payload_app_id = payload.get("NAME") if isinstance(payload, dict) else payload + if data.get("ACTION") == "DELETE_CONFIG" and payload_app_id == app_id: count += 1 return count -def make_pipeline_config(app_id, node_addresses): +def wait_for_delete_command_count(container, app_id, expected_count): + deadline = time.time() + POLL_TIMEOUT + last_count = None + while time.time() < deadline: + last_count = count_delete_commands(container, app_id) + if last_count >= expected_count: + return last_count + time.sleep(2) + raise TimeoutError( + f"Timed out waiting for {expected_count} DELETE_CONFIG command(s) for {app_id} " + f"on {container}; last observed count was {last_count}" + ) + + +def make_pipeline_config(app_id, node_addresses, lifecycle_generation=1, lifecycle_operation="create"): now = time.time() return { ct.CONFIG_STREAM.NAME: app_id, @@ -132,7 +173,9 @@ def make_pipeline_config(app_id, node_addresses): DEEPLOY_KEYS.CURRENT_TARGET_NODES: list(node_addresses), DEEPLOY_KEYS.DATE_CREATED: now, DEEPLOY_KEYS.DATE_UPDATED: now, - DEEPLOY_KEYS.JOB_TAGS: ["deeploy-0002-e2e"], + DEEPLOY_KEYS.LIFECYCLE_GENERATION: lifecycle_generation, + DEEPLOY_KEYS.LIFECYCLE_OPERATION: lifecycle_operation, + DEEPLOY_KEYS.JOB_TAGS: ["deeploy-0003-e2e"], DEEPLOY_KEYS.SPARE_NODES: [], DEEPLOY_KEYS.ALLOW_REPLICATION_IN_THE_WILD: False, }, @@ -148,7 +191,27 @@ def make_pipeline_config(app_id, node_addresses): } -def make_discovered_instances(app_id, node_addresses): +def make_delete_payload(app_id, deeploy_specs): + command_specs = {} + for key in ( + DEEPLOY_KEYS.JOB_ID, + DEEPLOY_KEYS.PROJECT_ID, + DEEPLOY_KEYS.LIFECYCLE_GENERATION, + DEEPLOY_KEYS.LIFECYCLE_OPERATION, + DEEPLOY_KEYS.DATE_CREATED, + DEEPLOY_KEYS.DATE_UPDATED, + ): + if key in deeploy_specs: + command_specs[key] = copy.deepcopy(deeploy_specs[key]) + command_specs[DEEPLOY_KEYS.LIFECYCLE_OPERATION] = "delete" + return { + ct.CONFIG_STREAM.NAME: app_id, + ct.CONFIG_STREAM.K_OWNER: OWNER, + ct.CONFIG_STREAM.DEEPLOY_SPECS: command_specs, + } + + +def make_discovered_instances(app_id, node_addresses, deeploy_specs): discovered = [] for node in node_addresses: for instance_id in PLUGIN_INSTANCES: @@ -162,6 +225,7 @@ def make_discovered_instances(app_id, node_addresses): "instance_conf": {}, }, DEEPLOY_PLUGIN_DATA.CHAINSTORE_RESPONSE_KEY: None, + DEEPLOY_PLUGIN_DATA.DEEPLOY_SPECS: copy.deepcopy(deeploy_specs), }) return discovered @@ -174,9 +238,10 @@ def make_delete_harness(session): plugin = RuntimeDeleteHarness.__new__(RuntimeDeleteHarness) plugin.P = lambda msg, *args, **kwargs: print(f"[deeploy-delete] {msg}") plugin.Pd = plugin.P - plugin.cmdapi_stop_pipeline = lambda node_address, name: session._send_command_delete_pipeline( + plugin.deepcopy = copy.deepcopy + plugin.cmdapi_stop_pipeline = lambda node_address, name, command_content=None: session._send_command_delete_pipeline( node_address, - name, + command_content if command_content is not None else name, show_command=False, ) return plugin @@ -197,7 +262,7 @@ def main(): secured=False, encrypt_comms=False, root_topic=ROOT_TOPIC, - name="deeploy-0002-e2e", + name="deeploy-0003-e2e", auto_configuration=False, run_dauth=False, use_home_folder=False, @@ -220,6 +285,37 @@ def main(): for container in CONTAINERS: wait_for_stream_state(container, APP_ID, expected_exists=True) + wait_for_stream_generation(container, APP_ID, generation=1) + + updated_pipeline_config = make_pipeline_config( + APP_ID, + node_addresses, + lifecycle_generation=2, + lifecycle_operation="update", + ) + for node_address in node_addresses: + session._send_command_create_pipeline(node_address, updated_pipeline_config, show_command=False) + + for container in CONTAINERS: + wait_for_stream_generation(container, APP_ID, generation=2) + + stale_delete_counts = { + container: count_delete_commands(container, APP_ID) + for container in CONTAINERS + } + stale_payload = make_delete_payload(APP_ID, pipeline_config["DEEPLOY_SPECS"]) + for node_address in node_addresses: + session._send_command_delete_pipeline(node_address, stale_payload, show_command=False) + + for container in CONTAINERS: + wait_for_delete_command_count(container, APP_ID, stale_delete_counts[container] + 1) + + # The stale delete has been received; leave the node a short settle window + # before asserting that the newer generation is still the saved config. + time.sleep(5) + for container in CONTAINERS: + wait_for_stream_state(container, APP_ID, expected_exists=True) + wait_for_stream_generation(container, APP_ID, generation=2) baseline_counts = { container: count_delete_commands(container, APP_ID) @@ -227,7 +323,11 @@ def main(): } harness = make_delete_harness(session) - discovered = make_discovered_instances(APP_ID, node_addresses) + discovered = make_discovered_instances( + APP_ID, + node_addresses, + updated_pipeline_config["DEEPLOY_SPECS"], + ) discovery_calls = [] def discover(**kwargs): @@ -267,6 +367,7 @@ def discover(**kwargs): print(json.dumps({ "app_id": APP_ID, "node_addresses": node_addresses, + "stale_delete_counts": stale_delete_counts, "baseline_delete_counts": baseline_counts, "final_delete_counts": final_counts, "delete_count_deltas": deltas, diff --git a/extensions/business/deeploy/deeploy_const.py b/extensions/business/deeploy/deeploy_const.py index 5ca05b0c..168cb885 100644 --- a/extensions/business/deeploy/deeploy_const.py +++ b/extensions/business/deeploy/deeploy_const.py @@ -68,6 +68,8 @@ class DEEPLOY_KEYS: # Config keys DATE_UPDATED = "date_updated" DATE_CREATED = "date_created" + LIFECYCLE_GENERATION = "lifecycle_generation" + LIFECYCLE_OPERATION = "lifecycle_operation" class DEEPLOY_STATUS: @@ -207,6 +209,7 @@ class DEEPLOY_PLUGIN_DATA: APP_ID = "app_id" NODE = "NODE" CHAINSTORE_RESPONSE_KEY = "CHAINSTORE_RESPONSE_KEY" + DEEPLOY_SPECS = "deeploy_specs" CONTAINER_APP_RUNNER_SIGNATURE = 'CONTAINER_APP_RUNNER' WORKER_APP_RUNNER_SIGNATURE = 'WORKER_APP_RUNNER' diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index dbd2c806..83d0f637 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -620,6 +620,48 @@ def __check_is_oracle(self, inputs): raise ValueError("Sender {} is not an oracle".format(sender)) return True + def _coerce_deeploy_lifecycle_generation(self, value): + if isinstance(value, bool) or value is None: + return 0 + try: + return max(0, int(float(value))) + except (TypeError, ValueError): + return 0 + + def _advance_deeploy_lifecycle_specs(self, dct_deeploy_specs, operation): + specs = self.deepcopy(dct_deeploy_specs) if isinstance(dct_deeploy_specs, dict) else {} + ts = self.time() + current_generation = self._coerce_deeploy_lifecycle_generation( + specs.get(DEEPLOY_KEYS.LIFECYCLE_GENERATION) + ) + specs[DEEPLOY_KEYS.LIFECYCLE_GENERATION] = current_generation + 1 + specs[DEEPLOY_KEYS.LIFECYCLE_OPERATION] = operation + specs[DEEPLOY_KEYS.DATE_UPDATED] = ts + if DEEPLOY_KEYS.DATE_CREATED not in specs: + specs[DEEPLOY_KEYS.DATE_CREATED] = ts + return specs + + def _make_deeploy_delete_command_content(self, app_id, owner, deeploy_specs): + source_specs = deeploy_specs if isinstance(deeploy_specs, dict) else {} + command_specs = {} + for key in ( + DEEPLOY_KEYS.LIFECYCLE_GENERATION, + DEEPLOY_KEYS.DATE_UPDATED, + DEEPLOY_KEYS.DATE_CREATED, + DEEPLOY_KEYS.JOB_ID, + DEEPLOY_KEYS.PROJECT_ID, + DEEPLOY_KEYS.PROJECT_NAME, + ): + if key in source_specs: + command_specs[key] = self.deepcopy(source_specs[key]) + command_specs[DEEPLOY_KEYS.LIFECYCLE_OPERATION] = "delete" + + return { + ct.CONFIG_STREAM.NAME: app_id, + ct.CONFIG_STREAM.K_OWNER: owner, + ct.CONFIG_STREAM.DEEPLOY_SPECS: command_specs, + } + def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, owner, job_app_type=None, dct_deeploy_specs=None): """ Create new pipelines on each node and set CSTORE `response_key` for the "callback" action @@ -678,6 +720,10 @@ def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, detected_job_app_type = job_app_type or self.deeploy_detect_job_app_type(plugins) if detected_job_app_type in JOB_APP_TYPES_ALL: dct_deeploy_specs[DEEPLOY_KEYS.JOB_APP_TYPE] = detected_job_app_type + dct_deeploy_specs = self._advance_deeploy_lifecycle_specs( + dct_deeploy_specs, + operation="create", + ) plugins = self._autowire_native_container_semaphore( app_id=app_id, @@ -799,6 +845,10 @@ def __update_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, dct_deeploy_specs, pipeline_params=pipeline_params, ) + dct_deeploy_specs = self._advance_deeploy_lifecycle_specs( + dct_deeploy_specs, + operation="update", + ) requested_by_instance_id, requested_by_signature, new_plugin_configs = self._organize_requested_plugins(inputs) @@ -1026,7 +1076,6 @@ def _prepare_updated_deeploy_specs(self, owner, app_id, job_id, discovered_plugi return None refreshed_specs = self.deepcopy(specs) - refreshed_specs[DEEPLOY_KEYS.DATE_UPDATED] = self.time() refreshed_specs = self._ensure_deeploy_specs_job_config(refreshed_specs) return refreshed_specs @@ -2891,6 +2940,21 @@ def _discover_plugin_instances( iter_plugins = [] if target_nodes is not None and node not in target_nodes: continue + + def _append_discovered_plugin(pipeline_app_id, pipeline_specs, current_plugin_signature, instance_dict): + current_instance_id = instance_dict[NetMonCt.PLUGIN_INSTANCE] + chainstore_key = instance_dict['instance_conf'].get(self.ct.BIZ_PLUGIN_DATA.CHAINSTORE_RESPONSE_KEY, None) + iter_plugins.append({ + DEEPLOY_PLUGIN_DATA.APP_ID: pipeline_app_id, + DEEPLOY_PLUGIN_DATA.INSTANCE_ID: current_instance_id, + DEEPLOY_PLUGIN_DATA.PLUGIN_SIGNATURE: current_plugin_signature, + DEEPLOY_PLUGIN_DATA.PLUGIN_INSTANCE: instance_dict, + DEEPLOY_PLUGIN_DATA.NODE: node, + DEEPLOY_PLUGIN_DATA.CHAINSTORE_RESPONSE_KEY: chainstore_key, + DEEPLOY_PLUGIN_DATA.DEEPLOY_SPECS: self.deepcopy(pipeline_specs or {}), + }) + return + # search by job_id if job_id is not None: for current_pipeline_app_id, pipeline in pipelines.items(): @@ -2901,59 +2965,50 @@ def _discover_plugin_instances( for current_plugin_signature, plugins_instances in pipeline[NetMonCt.PLUGINS].items(): for instance_dict in plugins_instances: current_instance_id = instance_dict[NetMonCt.PLUGIN_INSTANCE] - chainstore_key = instance_dict['instance_conf'].get(self.ct.BIZ_PLUGIN_DATA.CHAINSTORE_RESPONSE_KEY, None) if current_plugin_signature == plugin_signature and current_instance_id == instance_id: # If we find a match by signature and instance_id, add it to the list and break. - iter_plugins.append({ - DEEPLOY_PLUGIN_DATA.APP_ID: current_pipeline_app_id, - DEEPLOY_PLUGIN_DATA.INSTANCE_ID: current_instance_id, - DEEPLOY_PLUGIN_DATA.PLUGIN_SIGNATURE: current_plugin_signature, - DEEPLOY_PLUGIN_DATA.PLUGIN_INSTANCE: instance_dict, - DEEPLOY_PLUGIN_DATA.NODE: node, - DEEPLOY_PLUGIN_DATA.CHAINSTORE_RESPONSE_KEY: chainstore_key, - }) + _append_discovered_plugin( + pipeline_app_id=current_pipeline_app_id, + pipeline_specs=current_pipeline_deeploy_specs, + current_plugin_signature=current_plugin_signature, + instance_dict=instance_dict, + ) break if instance_id is None and (plugin_signature is None or plugin_signature == current_plugin_signature): # If no specific signature or instance_id is provided, add all instances - iter_plugins.append({ - DEEPLOY_PLUGIN_DATA.APP_ID: current_pipeline_app_id, - DEEPLOY_PLUGIN_DATA.INSTANCE_ID: current_instance_id, - DEEPLOY_PLUGIN_DATA.PLUGIN_SIGNATURE: current_plugin_signature, - DEEPLOY_PLUGIN_DATA.PLUGIN_INSTANCE: instance_dict, - DEEPLOY_PLUGIN_DATA.NODE: node, - DEEPLOY_PLUGIN_DATA.CHAINSTORE_RESPONSE_KEY: chainstore_key, - }) + _append_discovered_plugin( + pipeline_app_id=current_pipeline_app_id, + pipeline_specs=current_pipeline_deeploy_specs, + current_plugin_signature=current_plugin_signature, + instance_dict=instance_dict, + ) # search by app_id if len(iter_plugins) > 0: discovered_plugins.extend(iter_plugins) continue if app_id is not None and app_id in pipelines: + current_pipeline_deeploy_specs = pipelines[app_id].get(NetMonCt.DEEPLOY_SPECS, None) for current_plugin_signature, plugins_instances in pipelines[app_id][NetMonCt.PLUGINS].items(): # plugins_instances is a list of dictionaries for instance_dict in plugins_instances: current_instance_id = instance_dict[NetMonCt.PLUGIN_INSTANCE] - chainstore_key = instance_dict['instance_conf'].get(self.ct.BIZ_PLUGIN_DATA.CHAINSTORE_RESPONSE_KEY, None) if current_plugin_signature == plugin_signature and current_instance_id == instance_id: # If we find a match by signature and instance_id, add it to the list and break. - iter_plugins.append({ - DEEPLOY_PLUGIN_DATA.APP_ID : app_id, - DEEPLOY_PLUGIN_DATA.INSTANCE_ID : current_instance_id, - DEEPLOY_PLUGIN_DATA.PLUGIN_SIGNATURE : current_plugin_signature, - DEEPLOY_PLUGIN_DATA.PLUGIN_INSTANCE : instance_dict, - DEEPLOY_PLUGIN_DATA.NODE: node, - DEEPLOY_PLUGIN_DATA.CHAINSTORE_RESPONSE_KEY: chainstore_key, - }) + _append_discovered_plugin( + pipeline_app_id=app_id, + pipeline_specs=current_pipeline_deeploy_specs, + current_plugin_signature=current_plugin_signature, + instance_dict=instance_dict, + ) break if instance_id is None and (plugin_signature is None or plugin_signature == current_plugin_signature): # If no specific signature or instance_id is provided, add all instances - iter_plugins.append({ - DEEPLOY_PLUGIN_DATA.APP_ID : app_id, - DEEPLOY_PLUGIN_DATA.INSTANCE_ID : current_instance_id, - DEEPLOY_PLUGIN_DATA.PLUGIN_SIGNATURE : current_plugin_signature, - DEEPLOY_PLUGIN_DATA.PLUGIN_INSTANCE : instance_dict, - DEEPLOY_PLUGIN_DATA.NODE: node, - DEEPLOY_PLUGIN_DATA.CHAINSTORE_RESPONSE_KEY: chainstore_key, - }) + _append_discovered_plugin( + pipeline_app_id=app_id, + pipeline_specs=current_pipeline_deeploy_specs, + current_plugin_signature=current_plugin_signature, + instance_dict=instance_dict, + ) # endfor each instance # endfor each plugin signature # endif app_id found @@ -3224,7 +3279,10 @@ def prepare_create_update_pipelines(self, base_pipeline, new_nodes, update_nodes ) if isinstance(deeploy_specs, dict): deeploy_specs[DEEPLOY_KEYS.CURRENT_TARGET_NODES] = chainstore_peers - deeploy_specs[DEEPLOY_KEYS.DATE_UPDATED] = self.time() + deeploy_specs = self._advance_deeploy_lifecycle_specs( + deeploy_specs, + operation="scale", + ) base_pipeline[NetMonCt.DEEPLOY_SPECS] = self.deepcopy(deeploy_specs) chainstore_response_keys = self.defaultdict(list) @@ -3455,12 +3513,13 @@ def delete_pipeline_from_nodes(self, app_id=None, job_id=None, owner=None, targe for instance in discovered_instances: node = instance[DEEPLOY_PLUGIN_DATA.NODE] pipeline_app_id = instance[DEEPLOY_PLUGIN_DATA.APP_ID] + deeploy_specs = instance.get(DEEPLOY_PLUGIN_DATA.DEEPLOY_SPECS, {}) target_key = (node, pipeline_app_id) if target_key in seen_targets: duplicate_count += 1 continue seen_targets.add(target_key) - unique_targets.append(target_key) + unique_targets.append((node, pipeline_app_id, deeploy_specs)) if duplicate_count: self.P( @@ -3469,11 +3528,16 @@ def delete_pipeline_from_nodes(self, app_id=None, job_id=None, owner=None, targe color='y', ) - for node, pipeline_app_id in unique_targets: + for node, pipeline_app_id, deeploy_specs in unique_targets: self.P(f"Stopping pipeline '{pipeline_app_id}' on {node}") self.cmdapi_stop_pipeline( node_address=node, name=pipeline_app_id, + command_content=self._make_deeploy_delete_command_content( + app_id=pipeline_app_id, + owner=owner, + deeploy_specs=deeploy_specs, + ), ) #endfor each target node return discovered_instances diff --git a/extensions/business/deeploy/tests/test_create_requests.py b/extensions/business/deeploy/tests/test_create_requests.py index 5d8e61fc..9e5a85f4 100644 --- a/extensions/business/deeploy/tests/test_create_requests.py +++ b/extensions/business/deeploy/tests/test_create_requests.py @@ -6,6 +6,23 @@ class DeeployCreateRequestPreparationTests(unittest.TestCase): + def test_advance_lifecycle_specs_increments_generation(self): + plugin = make_deeploy_plugin() + plugin.time = lambda: 123.0 + + specs = plugin._advance_deeploy_lifecycle_specs( + { + DEEPLOY_KEYS.LIFECYCLE_GENERATION: 4, + DEEPLOY_KEYS.DATE_CREATED: 100.0, + }, + operation="update", + ) + + self.assertEqual(specs[DEEPLOY_KEYS.LIFECYCLE_GENERATION], 5) + self.assertEqual(specs[DEEPLOY_KEYS.LIFECYCLE_OPERATION], "update") + self.assertEqual(specs[DEEPLOY_KEYS.DATE_UPDATED], 123.0) + self.assertEqual(specs[DEEPLOY_KEYS.DATE_CREATED], 100.0) + def test_prepare_single_plugin_instance_uses_signature_and_app_params(self): plugin = make_deeploy_plugin() inputs = make_inputs( diff --git a/extensions/business/deeploy/tests/test_delete_pipeline_commands.py b/extensions/business/deeploy/tests/test_delete_pipeline_commands.py index 19661ab5..2725002e 100644 --- a/extensions/business/deeploy/tests/test_delete_pipeline_commands.py +++ b/extensions/business/deeploy/tests/test_delete_pipeline_commands.py @@ -1,3 +1,4 @@ +import copy import sys import types import unittest @@ -25,9 +26,10 @@ def decorator(fn): from extensions.business.deeploy.deeploy_const import DEEPLOY_KEYS, DEEPLOY_PLUGIN_DATA, DEEPLOY_STATUS from extensions.business.deeploy.deeploy_manager_api import DeeployManagerApiPlugin from extensions.business.deeploy.tests.support import InputsStub, make_deeploy_plugin +from naeural_core import constants as ct -def _discovered_instance(app_id, node, signature, instance_id): +def _discovered_instance(app_id, node, signature, instance_id, lifecycle_generation=3, date_updated=300.0): return { DEEPLOY_PLUGIN_DATA.APP_ID: app_id, DEEPLOY_PLUGIN_DATA.NODE: node, @@ -38,14 +40,25 @@ def _discovered_instance(app_id, node, signature, instance_id): "instance_conf": {}, }, DEEPLOY_PLUGIN_DATA.CHAINSTORE_RESPONSE_KEY: None, + DEEPLOY_PLUGIN_DATA.DEEPLOY_SPECS: { + DEEPLOY_KEYS.LIFECYCLE_GENERATION: lifecycle_generation, + DEEPLOY_KEYS.DATE_UPDATED: date_updated, + DEEPLOY_KEYS.JOB_ID: 77, + }, } def _make_delete_plugin(): plugin = make_deeploy_plugin() plugin.stop_calls = [] + plugin.stop_payloads = [] plugin.logs = [] - plugin.cmdapi_stop_pipeline = lambda node_address, name: plugin.stop_calls.append((node_address, name)) + + def stop_pipeline(node_address, name, command_content=None): + plugin.stop_calls.append((node_address, name)) + plugin.stop_payloads.append(command_content) + + plugin.cmdapi_stop_pipeline = stop_pipeline plugin.P = lambda msg, *args, **kwargs: plugin.logs.append(str(msg)) plugin.Pd = lambda msg, *args, **kwargs: plugin.logs.append(str(msg)) return plugin @@ -71,6 +84,39 @@ def test_multiplugin_single_node_delete_emits_one_pipeline_stop(self): self.assertEqual(plugin.stop_calls, [("0xai_node_1", "sentinelapi-0_d60b35d")]) self.assertTrue(any("duplicate" in log.lower() or "collapsed" in log.lower() for log in plugin.logs)) + def test_delete_command_uses_discovered_lifecycle_generation(self): + plugin = _make_delete_plugin() + discovered = [ + _discovered_instance( + "app-1", + "0xai_node_1", + "PLUGIN_A", + "a-1", + lifecycle_generation=4, + date_updated=400.0, + ), + ] + + plugin.delete_pipeline_from_nodes( + app_id="app-1", + owner="0xOwner", + discovered_instances=discovered, + ) + + self.assertEqual(plugin.stop_calls, [("0xai_node_1", "app-1")]) + self.assertEqual(len(plugin.stop_payloads), 1) + payload = plugin.stop_payloads[0] + self.assertEqual(payload[ct.CONFIG_STREAM.NAME], "app-1") + self.assertEqual(payload[ct.CONFIG_STREAM.K_OWNER], "0xOwner") + self.assertEqual( + payload[ct.CONFIG_STREAM.DEEPLOY_SPECS][DEEPLOY_KEYS.LIFECYCLE_GENERATION], + 4, + ) + self.assertEqual( + payload[ct.CONFIG_STREAM.DEEPLOY_SPECS][DEEPLOY_KEYS.DATE_UPDATED], + 400.0, + ) + def test_multinode_delete_emits_one_stop_per_node_in_first_discovery_order(self): plugin = _make_delete_plugin() discovered = [ @@ -226,6 +272,7 @@ def test_delete_endpoint_omitted_target_nodes_discovers_all_targets(self): plugin = DeeployManagerApiPlugin.__new__(DeeployManagerApiPlugin) plugin.stop_calls = [] plugin.discovery_calls = [] + plugin.deepcopy = copy.deepcopy plugin.cfg_deeploy_verbose = 0 plugin.P = lambda *args, **kwargs: None plugin.Pd = lambda *args, **kwargs: None @@ -243,7 +290,11 @@ def test_delete_endpoint_omitted_target_nodes_discovers_all_targets(self): DEEPLOY_KEYS.ESCROW_OWNER: "0xOwner", } plugin._DeeployManagerApiPlugin__ensure_eth_balance = lambda: None - plugin.cmdapi_stop_pipeline = lambda node_address, name: plugin.stop_calls.append((node_address, name)) + + def stop_pipeline(node_address, name, command_content=None): + plugin.stop_calls.append((node_address, name)) + + plugin.cmdapi_stop_pipeline = stop_pipeline def discover(**kwargs): plugin.discovery_calls.append(kwargs)