-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevolution_engine.py
More file actions
226 lines (188 loc) · 7.44 KB
/
Copy pathevolution_engine.py
File metadata and controls
226 lines (188 loc) · 7.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
"""
代码进化引擎
使用 Parallax 本地生成代码变体
"""
import requests
import json
from datetime import datetime
class EvolutionEngine:
def __init__(self, parallax_url="http://localhost:3001"):
"""初始化 Parallax 客户端"""
self.parallax_url = parallax_url
self.api_endpoint = f"{parallax_url}/v1/chat/completions"
# 测试连接
try:
response = requests.get(f"{parallax_url}/health", timeout=2)
print("✅ Parallax 连接成功")
self.available = True
except:
print("⚠️ Parallax 未启动,将使用模拟模式")
print("💡 提示:运行 'parallax run' 启动 Parallax 服务")
self.available = False
def mutate_code(self, code, generation, best_time):
"""生成代码变体"""
if not self.available:
# 模拟模式:返回一些预设的优化版本
return self._mock_mutate(code, generation)
prompt = f"""你是代码优化专家。请优化以下排序函数。
当前代码(第 {generation} 代,耗时 {best_time:.6f}秒):
```python
{code}
```
优化要求:
1. 保持函数签名不变:def sort_array(arr)
2. 必须返回排序后的数组
3. 尝试改进算法效率(减少时间复杂度)
4. 可以尝试:改变算法逻辑、优化循环、使用更好的数据结构
只返回优化后的完整函数代码,不要解释:
```python"""
try:
# 调用 Parallax API
payload = {
"messages": [
{"role": "user", "content": prompt}
],
"max_tokens": 500,
"temperature": 0.7,
"stream": False
}
response = requests.post(
self.api_endpoint,
json=payload,
timeout=30
)
if response.status_code == 200:
result = response.json()
content = result['choices'][0]['message']['content']
# 提取代码
code_start = content.find("def sort_array")
if code_start != -1:
code_end = content.find("```", code_start)
if code_end != -1:
new_code = content[code_start:code_end].strip()
else:
new_code = content[code_start:].strip()
return new_code
else:
return code
else:
print(f"⚠️ API 错误: {response.status_code}")
return code
except Exception as e:
print(f"⚠️ 代码生成失败: {e}")
return code
def _mock_mutate(self, code, generation):
"""模拟模式:返回预设的优化版本"""
# 简单的优化序列
optimizations = [
# 优化 1:添加提前退出
"""def sort_array(arr):
n = len(arr)
for i in range(n):
swapped = False
for j in range(n - i - 1):
if arr[j] > arr[j + 1]:
arr[j], arr[j + 1] = arr[j + 1], arr[j]
swapped = True
if not swapped:
break
return arr""",
# 优化 2:插入排序
"""def sort_array(arr):
for i in range(1, len(arr)):
key = arr[i]
j = i - 1
while j >= 0 and arr[j] > key:
arr[j + 1] = arr[j]
j -= 1
arr[j + 1] = key
return arr""",
# 优化 3:快速排序
"""def sort_array(arr):
if len(arr) <= 1:
return arr
pivot = arr[len(arr) // 2]
left = [x for x in arr if x < pivot]
middle = [x for x in arr if x == pivot]
right = [x for x in arr if x > pivot]
return sort_array(left) + middle + sort_array(right)""",
]
# 根据代数返回不同的优化
idx = min(generation // 5, len(optimizations) - 1)
return optimizations[idx]
def evolve(self, initial_code, generations=50, variants_per_gen=3):
"""核心进化循环"""
from code_executor import CodeExecutor
executor = CodeExecutor()
current_best = initial_code
best_time = executor.measure_time(current_best)
history = [{
"generation": 0,
"code": current_best,
"time": best_time,
"timestamp": datetime.now().isoformat()
}]
print(f"\n🧬 开始进化...")
print(f"Generation 0: {best_time:.6f}s (baseline)")
for gen in range(1, generations + 1):
improved = False
# 生成多个变体
for variant_idx in range(variants_per_gen):
print(f"\n 生成变体 {variant_idx + 1}/{variants_per_gen}...", end=" ")
# 生成变体
variant = self.mutate_code(current_best, gen, best_time)
# 测试变体
variant_time = executor.measure_time(variant)
if variant_time is None:
print("❌ 执行失败")
continue
print(f"{variant_time:.6f}s", end=" ")
# 如果更好,更新
if variant_time < best_time:
improvement = ((best_time - variant_time) / best_time) * 100
print(f"✅ 提升 {improvement:.1f}%")
current_best = variant
best_time = variant_time
improved = True
# 记录历史
history.append({
"generation": gen,
"code": current_best,
"time": best_time,
"improvement": improvement,
"timestamp": datetime.now().isoformat()
})
break # 找到更好的就进入下一代
else:
print("⚪ 无改进")
if not improved:
print(f"\nGeneration {gen}: 无改进,继续探索...")
else:
print(f"\n✨ Generation {gen}: {best_time:.6f}s (best so far)")
print(f"\n🎉 进化完成!")
print(f"最终性能: {best_time:.6f}s")
print(f"总提升: {((history[0]['time'] - best_time) / history[0]['time'] * 100):.1f}%")
# 保存历史
with open('evolution_history.json', 'w') as f:
json.dump(history, f, indent=2, ensure_ascii=False)
return {
"final_code": current_best,
"final_time": best_time,
"history": history
}
if __name__ == "__main__":
# 初始代码:冒泡排序
initial_code = """def sort_array(arr):
n = len(arr)
for i in range(n):
for j in range(n - i - 1):
if arr[j] > arr[j + 1]:
arr[j], arr[j + 1] = arr[j + 1], arr[j]
return arr"""
# 开始进化
engine = EvolutionEngine()
result = engine.evolve(initial_code, generations=20, variants_per_gen=3)
print("\n" + "="*50)
print("最终代码:")
print("="*50)
print(result['final_code'])