Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docker-compose/deeploy-testbed.local-core.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
services:
deeploy_test_node_1:
volumes:
- ${DEEPLOY_TESTBED_CORE_PATH:?set DEEPLOY_TESTBED_CORE_PATH to the local naeural_core package directory}:/opt/python/lib/python3.13/site-packages/naeural_core:ro

deeploy_test_node_2:
volumes:
- ${DEEPLOY_TESTBED_CORE_PATH:?set DEEPLOY_TESTBED_CORE_PATH to the local naeural_core package directory}:/opt/python/lib/python3.13/site-packages/naeural_core:ro
2 changes: 1 addition & 1 deletion docker-compose/deeploy-testbed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ x-edge-node-env: &edge-node-env
EE_TUNNEL_ENGINE: none

x-edge-node: &edge-node
image: local_edge_node_deeploy_0002
image: local_edge_node_deeploy_testbed
build:
context: ..
dockerfile: Dockerfile_devnet
Expand Down
13 changes: 13 additions & 0 deletions docker-compose/deeploy-testbed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,22 @@ PYTHONPATH=/mnt/c/repos/naeural_core:/mnt/c/repos/naeural_client:. /home/bleot/v
docker compose -f docker-compose/deeploy-testbed.yaml down -v
```

When validating local `naeural_core` receiver changes before they are published,
mount the local package into both node containers:

```bash
DEEPLOY_TESTBED_CORE_PATH=/absolute/path/to/naeural_core/naeural_core \
docker compose -f docker-compose/deeploy-testbed.yaml -f docker-compose/deeploy-testbed.local-core.yaml up -d --build
PYTHONPATH=/absolute/path/to/naeural_core:/absolute/path/to/naeural_client:. python docker-compose/deeploy-testbed/validate_delete_workflow.py
DEEPLOY_TESTBED_CORE_PATH=/absolute/path/to/naeural_core/naeural_core \
docker compose -f docker-compose/deeploy-testbed.yaml -f docker-compose/deeploy-testbed.local-core.yaml down -v
```

The validation script creates one Deeploy-like multi-plugin app pipeline on
each local node, calls the real `delete_pipeline_from_nodes()` path, and asserts
that each node receives exactly one `DELETE_CONFIG` for the app.
It also updates the pipeline to a newer Deeploy lifecycle generation, sends an
older structured delete command, and asserts the newer saved config remains.

The app uses testbed-only data/business plugins mounted into the node containers
under the normal root plugin search paths.
Expand Down
117 changes: 109 additions & 8 deletions docker-compose/deeploy-testbed/validate_delete_workflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python3
import copy
import json
import os
import subprocess
Expand Down Expand Up @@ -80,6 +81,10 @@ def stream_exists(container, app_id):
return result.returncode == 0


def read_stream_config(container, app_id):
return read_json_file(container, stream_config_path(app_id))


def wait_for_stream_state(container, app_id, expected_exists):
deadline = time.time() + POLL_TIMEOUT
while time.time() < deadline:
Expand All @@ -90,6 +95,26 @@ def wait_for_stream_state(container, app_id, expected_exists):
raise TimeoutError(f"Timed out waiting for {app_id} to {state} on {container}")


def wait_for_stream_generation(container, app_id, generation):
deadline = time.time() + POLL_TIMEOUT
last_generation = None
while time.time() < deadline:
if stream_exists(container, app_id):
try:
config = read_stream_config(container, app_id)
specs = config.get("DEEPLOY_SPECS", {})
last_generation = specs.get(DEEPLOY_KEYS.LIFECYCLE_GENERATION)
if last_generation == generation:
return config
except Exception:
pass
time.sleep(2)
raise TimeoutError(
f"Timed out waiting for {app_id} generation {generation} on {container}; "
f"last observed generation was {last_generation}"
)


def list_received_command_files(container):
cmd = "find /edge_node/_local_cache/_output/received_commands -type f -name '*.json' -print 2>/dev/null | sort"
result = docker_exec(container, "sh", "-c", cmd, check=False)
Expand All @@ -110,12 +135,28 @@ def count_delete_commands(container, app_id):
data = read_json_file(container, path)
except Exception:
continue
if data.get("ACTION") == "DELETE_CONFIG" and data.get("PAYLOAD") == app_id:
payload = data.get("PAYLOAD")
payload_app_id = payload.get("NAME") if isinstance(payload, dict) else payload
if data.get("ACTION") == "DELETE_CONFIG" and payload_app_id == app_id:
count += 1
return count


def make_pipeline_config(app_id, node_addresses):
def wait_for_delete_command_count(container, app_id, expected_count):
deadline = time.time() + POLL_TIMEOUT
last_count = None
while time.time() < deadline:
last_count = count_delete_commands(container, app_id)
if last_count >= expected_count:
return last_count
time.sleep(2)
raise TimeoutError(
f"Timed out waiting for {expected_count} DELETE_CONFIG command(s) for {app_id} "
f"on {container}; last observed count was {last_count}"
)


def make_pipeline_config(app_id, node_addresses, lifecycle_generation=1, lifecycle_operation="create"):
now = time.time()
return {
ct.CONFIG_STREAM.NAME: app_id,
Expand All @@ -132,7 +173,9 @@ def make_pipeline_config(app_id, node_addresses):
DEEPLOY_KEYS.CURRENT_TARGET_NODES: list(node_addresses),
DEEPLOY_KEYS.DATE_CREATED: now,
DEEPLOY_KEYS.DATE_UPDATED: now,
DEEPLOY_KEYS.JOB_TAGS: ["deeploy-0002-e2e"],
DEEPLOY_KEYS.LIFECYCLE_GENERATION: lifecycle_generation,
DEEPLOY_KEYS.LIFECYCLE_OPERATION: lifecycle_operation,
DEEPLOY_KEYS.JOB_TAGS: ["deeploy-0003-e2e"],
DEEPLOY_KEYS.SPARE_NODES: [],
DEEPLOY_KEYS.ALLOW_REPLICATION_IN_THE_WILD: False,
},
Expand All @@ -148,7 +191,27 @@ def make_pipeline_config(app_id, node_addresses):
}


def make_discovered_instances(app_id, node_addresses):
def make_delete_payload(app_id, deeploy_specs):
command_specs = {}
for key in (
DEEPLOY_KEYS.JOB_ID,
DEEPLOY_KEYS.PROJECT_ID,
DEEPLOY_KEYS.LIFECYCLE_GENERATION,
DEEPLOY_KEYS.LIFECYCLE_OPERATION,
DEEPLOY_KEYS.DATE_CREATED,
DEEPLOY_KEYS.DATE_UPDATED,
):
if key in deeploy_specs:
command_specs[key] = copy.deepcopy(deeploy_specs[key])
command_specs[DEEPLOY_KEYS.LIFECYCLE_OPERATION] = "delete"
return {
ct.CONFIG_STREAM.NAME: app_id,
ct.CONFIG_STREAM.K_OWNER: OWNER,
ct.CONFIG_STREAM.DEEPLOY_SPECS: command_specs,
}


def make_discovered_instances(app_id, node_addresses, deeploy_specs):
discovered = []
for node in node_addresses:
for instance_id in PLUGIN_INSTANCES:
Expand All @@ -162,6 +225,7 @@ def make_discovered_instances(app_id, node_addresses):
"instance_conf": {},
},
DEEPLOY_PLUGIN_DATA.CHAINSTORE_RESPONSE_KEY: None,
DEEPLOY_PLUGIN_DATA.DEEPLOY_SPECS: copy.deepcopy(deeploy_specs),
})
return discovered

Expand All @@ -174,9 +238,10 @@ def make_delete_harness(session):
plugin = RuntimeDeleteHarness.__new__(RuntimeDeleteHarness)
plugin.P = lambda msg, *args, **kwargs: print(f"[deeploy-delete] {msg}")
plugin.Pd = plugin.P
plugin.cmdapi_stop_pipeline = lambda node_address, name: session._send_command_delete_pipeline(
plugin.deepcopy = copy.deepcopy
plugin.cmdapi_stop_pipeline = lambda node_address, name, command_content=None: session._send_command_delete_pipeline(
node_address,
name,
command_content if command_content is not None else name,
show_command=False,
)
return plugin
Expand All @@ -197,7 +262,7 @@ def main():
secured=False,
encrypt_comms=False,
root_topic=ROOT_TOPIC,
name="deeploy-0002-e2e",
name="deeploy-0003-e2e",
auto_configuration=False,
run_dauth=False,
use_home_folder=False,
Expand All @@ -220,14 +285,49 @@ def main():

for container in CONTAINERS:
wait_for_stream_state(container, APP_ID, expected_exists=True)
wait_for_stream_generation(container, APP_ID, generation=1)

updated_pipeline_config = make_pipeline_config(
APP_ID,
node_addresses,
lifecycle_generation=2,
lifecycle_operation="update",
)
for node_address in node_addresses:
session._send_command_create_pipeline(node_address, updated_pipeline_config, show_command=False)

for container in CONTAINERS:
wait_for_stream_generation(container, APP_ID, generation=2)

stale_delete_counts = {
container: count_delete_commands(container, APP_ID)
for container in CONTAINERS
}
stale_payload = make_delete_payload(APP_ID, pipeline_config["DEEPLOY_SPECS"])
for node_address in node_addresses:
session._send_command_delete_pipeline(node_address, stale_payload, show_command=False)

for container in CONTAINERS:
wait_for_delete_command_count(container, APP_ID, stale_delete_counts[container] + 1)

# The stale delete has been received; leave the node a short settle window
# before asserting that the newer generation is still the saved config.
time.sleep(5)
for container in CONTAINERS:
wait_for_stream_state(container, APP_ID, expected_exists=True)
wait_for_stream_generation(container, APP_ID, generation=2)

baseline_counts = {
container: count_delete_commands(container, APP_ID)
for container in CONTAINERS
}

harness = make_delete_harness(session)
discovered = make_discovered_instances(APP_ID, node_addresses)
discovered = make_discovered_instances(
APP_ID,
node_addresses,
updated_pipeline_config["DEEPLOY_SPECS"],
)
discovery_calls = []

def discover(**kwargs):
Expand Down Expand Up @@ -267,6 +367,7 @@ def discover(**kwargs):
print(json.dumps({
"app_id": APP_ID,
"node_addresses": node_addresses,
"stale_delete_counts": stale_delete_counts,
"baseline_delete_counts": baseline_counts,
"final_delete_counts": final_counts,
"delete_count_deltas": deltas,
Expand Down
3 changes: 3 additions & 0 deletions extensions/business/deeploy/deeploy_const.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class DEEPLOY_KEYS:
# Config keys
DATE_UPDATED = "date_updated"
DATE_CREATED = "date_created"
LIFECYCLE_GENERATION = "lifecycle_generation"
LIFECYCLE_OPERATION = "lifecycle_operation"


class DEEPLOY_STATUS:
Expand Down Expand Up @@ -207,6 +209,7 @@ class DEEPLOY_PLUGIN_DATA:
APP_ID = "app_id"
NODE = "NODE"
CHAINSTORE_RESPONSE_KEY = "CHAINSTORE_RESPONSE_KEY"
DEEPLOY_SPECS = "deeploy_specs"

CONTAINER_APP_RUNNER_SIGNATURE = 'CONTAINER_APP_RUNNER'
WORKER_APP_RUNNER_SIGNATURE = 'WORKER_APP_RUNNER'
Expand Down
Loading