-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevolution_engine_distributed.py
More file actions
262 lines (212 loc) · 8.98 KB
/
Copy pathevolution_engine_distributed.py
File metadata and controls
262 lines (212 loc) · 8.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
"""
分布式代码进化引擎
使用多台设备的 MLX Worker 服务
"""
import requests
import json
from datetime import datetime
import random
import concurrent.futures
class EvolutionEngineDistributed:
def __init__(self, worker_urls=None):
"""
初始化分布式引擎
参数:
worker_urls: Worker 设备的 URL 列表
例如: [
"http://192.168.1.101:8001",
"http://192.168.1.102:8001"
]
如果为 None,会尝试自动发现本地 Worker
"""
if worker_urls is None:
# 默认尝试本地 Worker
worker_urls = ["http://localhost:8001"]
self.worker_urls = worker_urls
self.available_workers = []
print(f"🔍 检测 {len(worker_urls)} 个 Worker...")
self.check_workers()
def check_workers(self):
"""检查哪些 Worker 可用"""
self.available_workers = []
for url in self.worker_urls:
try:
response = requests.get(f"{url}/health", timeout=2)
if response.status_code == 200:
data = response.json()
if data.get('model_loaded'):
self.available_workers.append(url)
print(f" ✅ {url}")
else:
print(f" ⚠️ {url} (模型未加载)")
except:
print(f" ❌ {url} 不可用")
print(f"\n可用 Worker: {len(self.available_workers)}/{len(self.worker_urls)}")
self.available = len(self.available_workers) > 0
def mutate_code(self, code, generation, best_time, worker_url=None):
"""生成代码变体"""
if not self.available_workers:
return code
# 如果没有指定 Worker,随机选择一个
if worker_url is None:
worker_url = random.choice(self.available_workers)
prompt = f"""你是代码优化专家。请优化以下排序函数。
当前代码(第 {generation} 代,耗时 {best_time:.6f}秒):
```python
{code}
```
优化要求:
1. 保持函数签名不变:def sort_array(arr)
2. 必须返回排序后的数组
3. 尝试改进算法效率
4. 可以尝试:改变算法逻辑、优化循环、使用更好的数据结构
只返回优化后的完整函数代码,不要解释:
```python"""
try:
response = requests.post(
f"{worker_url}/generate",
json={
"prompt": prompt,
"max_tokens": 300
},
timeout=60
)
if response.status_code == 200:
data = response.json()
content = data['response']
# 提取代码
if "def sort_array" in content:
code_start = content.find("def sort_array")
code_end = content.find("```", code_start)
if code_end != -1:
new_code = content[code_start:code_end].strip()
else:
new_code = content[code_start:].strip()
# 清理代码
lines = new_code.split('\n')
clean_lines = []
for line in lines:
if line.strip():
clean_lines.append(line)
if 'return' in line:
clean_lines.append(line)
break
return '\n'.join(clean_lines) if clean_lines else code
else:
return code
else:
print(f"⚠️ Worker 返回错误: {response.status_code}")
return code
except Exception as e:
print(f"⚠️ Worker {worker_url} 失败: {e}")
return code
def mutate_code_parallel(self, code, generation, best_time, num_variants=3):
"""并行生成多个代码变体"""
variants = []
# 限制并发数为可用 Worker 数量
max_workers = min(num_variants, len(self.available_workers))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 为每个变体分配一个 Worker
futures = []
for i in range(num_variants):
worker_url = self.available_workers[i % len(self.available_workers)]
future = executor.submit(
self.mutate_code, code, generation, best_time, worker_url
)
futures.append(future)
# 收集结果
for future in concurrent.futures.as_completed(futures):
try:
variant = future.result()
variants.append(variant)
except Exception as e:
print(f"⚠️ 变体生成失败: {e}")
return variants
def evolve(self, initial_code, generations=50, variants_per_gen=3):
"""核心进化循环(使用并行生成)"""
from code_executor import CodeExecutor
executor = CodeExecutor()
current_best = initial_code
best_time = executor.measure_time(current_best)
history = [{
"generation": 0,
"code": current_best,
"time": best_time,
"timestamp": datetime.now().isoformat()
}]
print(f"\n🧬 开始进化(分布式模式,{len(self.available_workers)} 个 Worker)...")
print(f"Generation 0: {best_time:.6f}s (baseline)")
for gen in range(1, generations + 1):
print(f"\n🔄 Generation {gen}: 并行生成 {variants_per_gen} 个变体...")
# 并行生成多个变体
variants = self.mutate_code_parallel(
current_best, gen, best_time, variants_per_gen
)
# 测试所有变体
improved = False
for idx, variant in enumerate(variants):
variant_time = executor.measure_time(variant)
if variant_time is None:
print(f" 变体 {idx+1}: ❌ 执行失败")
continue
print(f" 变体 {idx+1}: {variant_time:.6f}s", end=" ")
if variant_time < best_time:
improvement = ((best_time - variant_time) / best_time) * 100
print(f"✅ 提升 {improvement:.1f}%")
current_best = variant
best_time = variant_time
improved = True
history.append({
"generation": gen,
"code": current_best,
"time": best_time,
"improvement": improvement,
"timestamp": datetime.now().isoformat()
})
break
else:
print("⚪ 无改进")
if not improved:
print(f"Generation {gen}: 无改进,继续探索...")
else:
print(f"✨ Generation {gen}: {best_time:.6f}s (best so far)")
print(f"\n🎉 进化完成!")
print(f"最终性能: {best_time:.6f}s")
print(f"总提升: {((history[0]['time'] - best_time) / history[0]['time'] * 100):.1f}%")
with open('evolution_history.json', 'w') as f:
json.dump(history, f, indent=2, ensure_ascii=False)
return {
"final_code": current_best,
"final_time": best_time,
"history": history
}
if __name__ == "__main__":
import sys
print("🧬 代码进化实验室 - 分布式模式")
print("=" * 50)
# 配置 Worker 地址
# 修改这里添加你的其他设备
worker_urls = [
"http://localhost:8001", # 本地 Worker
# "http://192.168.1.101:8001", # 设备 2
# "http://192.168.1.102:8001", # 设备 3
]
engine = EvolutionEngineDistributed(worker_urls)
if not engine.available:
print("\n❌ 没有可用的 Worker")
print("\n💡 启动 Worker:")
print(" python mlx_worker.py")
sys.exit(1)
# 测试进化
initial_code = """def sort_array(arr):
n = len(arr)
for i in range(n):
for j in range(n - i - 1):
if arr[j] > arr[j + 1]:
arr[j], arr[j + 1] = arr[j + 1], arr[j]
return arr"""
result = engine.evolve(initial_code, generations=5, variants_per_gen=3)
print("\n" + "="*50)
print("最终代码:")
print("="*50)
print(result['final_code'])