-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
466 lines (391 loc) · 19.6 KB
/
Copy pathapp.py
File metadata and controls
466 lines (391 loc) · 19.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
import gradio as gr
import os
from dotenv import load_dotenv
load_dotenv()
import modal
from mcp_server import MCPServer
from utils import text_to_speech, map_sound_to_vibration
from utils import text_to_speech, map_sound_to_vibration
import time
import numpy as np
from PIL import Image
import io
import asyncio
# Initialize MCP Server
mcp = MCPServer()
# Modal Function Lookup
# Assumes 'modal deploy modal_app.py' has been run
try:
ModelInference = modal.Cls.from_name("accessibility-companion", "ModelInference")
model_inference = ModelInference()
# Hearing Assistant Backend
HearingModel = modal.Cls.from_name("accessibility-companion", "HearingModel")
hearing_model = HearingModel()
# Emotion Detection Backend
EmotionModel = modal.Cls.from_name("accessibility-companion", "EmotionModel")
emotion_model = EmotionModel()
# Speaker Diarization Backend
DiarizationModel = modal.Cls.from_name("accessibility-companion", "DiarizationModel")
diarization_model = DiarizationModel()
except Exception as e:
print(f"Could not lookup Modal app: {e}")
print("Ensure you have deployed the app with `modal deploy modal_app.py`")
model_inference = None
hearing_model = None
emotion_model = None
diarization_model = None
# Optional: PaddleOCR Modal service for faster OCR
use_paddle_ocr = os.getenv("USE_PADDLE_OCR", "1") == "1"
try:
PaddleOCRService = modal.Cls.from_name("paddleocr-service", "PaddleOCRService")
paddle_ocr_client = PaddleOCRService()
except Exception as e:
paddle_ocr_client = None
if use_paddle_ocr:
print(f"PaddleOCR service unavailable: {e}")
# Configure Local Gemini
import google.generativeai as genai
gemini_api_key = os.getenv("GEMINI_API_KEY")
if gemini_api_key:
try:
genai.configure(api_key=gemini_api_key)
# User requested "2.5 Flash", and it exists!
# User requested update due to 1.5 deprecation concerns.
# Switching to Gemini 2.5 Flash Image (Newer, Vision Optimized)
try:
gemini_model = genai.GenerativeModel("models/gemini-2.5-flash-image")
print(f"✅ Local Gemini 2.5 Flash Image loaded. Key: {gemini_api_key[:4]}...")
except Exception:
print("⚠️ Gemini 2.5 failed, falling back to Flash Latest.")
gemini_model = genai.GenerativeModel("models/gemini-flash-latest")
print(f"✅ Local Gemini Flash Latest loaded. Key: {gemini_api_key[:4]}...")
except Exception as e:
print(f"⚠️ Gemini Config Error: {e}")
gemini_model = None
else:
gemini_model = None
print("⚠️ GEMINI_API_KEY not found in env.")
def process_vision(image):
print(f"📷 process_vision called with image type: {type(image)}")
if image is None:
print("❌ Error: Image is None")
return "", "No image provided. Please snap a photo first.", "No image provided.", None
if not model_inference:
return "", "Backend not connected.", "Backend not connected.", None
# Convert to PIL Image for Gemini and then to bytes for Modal
try:
# Gradio can return PIL when type="pil"; otherwise numpy array
if isinstance(image, Image.Image):
pil_img = image.convert("RGB")
else:
pil_img = Image.fromarray(image).convert("RGB")
# Resize image to max 1024px to reduce latency
max_size = 1024
if max(pil_img.size) > max_size:
pil_img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
print(f"📉 Resized image to {pil_img.size}")
except Exception as e:
return f"Invalid image input: {e}", "Invalid image input.", None
img_byte_arr = io.BytesIO()
pil_img.save(img_byte_arr, format='JPEG')
img_bytes = img_byte_arr.getvalue()
# OCR: Use Local Gemini Flash (Fastest & Best)
ocr_text = ""
if gemini_model:
import time
from google.api_core import exceptions
# Helper to try generation with retries
def try_generate(model, prompt, image, retries=3):
for i in range(retries):
try:
return model.generate_content(
[prompt, image],
generation_config={"temperature": 0.1}
)
except exceptions.ResourceExhausted:
if i < retries - 1:
time.sleep(2 ** (i + 1)) # Exponential backoff: 2s, 4s, 8s
continue
raise
except Exception:
raise
return None
# Prompt for "Super OCR"
prompt = (
"Extract all text from this image exactly as it appears. "
"Return ONLY the raw text. Do not add any conversational filler like 'Here is the text' or 'The text says'. "
"If there is no text, return 'No text detected'."
)
try:
response = try_generate(gemini_model, prompt, pil_img)
if response and response.text:
ocr_text = response.text.strip()
else:
ocr_text = "No text detected (Empty response)."
except exceptions.ResourceExhausted:
print("⚠️ Primary model rate limited. Trying fallback model...")
try:
# Fallback to standard flash-latest if 2.5/exp fails
fallback_model = genai.GenerativeModel("models/gemini-flash-latest")
response = try_generate(fallback_model, prompt, pil_img)
ocr_text = response.text
except Exception as e:
print(f"Fallback OCR failed: {e}")
ocr_text = f"OCR Error (Rate Limit): {e}"
except Exception as e:
print(f"Gemini OCR failed: {e}")
ocr_text = f"Gemini OCR Error: {e}"
else:
ocr_text = "Gemini API Key missing. Falling back to Modal OCR."
# Fallback to Modal OCR if Gemini is not configured
if use_paddle_ocr and paddle_ocr_client:
try:
ocr_text = paddle_ocr_client.ocr_image.remote(img_bytes)
except Exception as e:
print(f"PaddleOCR failed, falling back to Chandra OCR: {e}")
ocr_text = ""
if not ocr_text or ocr_text.startswith("Gemini API Key missing"): # Ensure fallback if Gemini was the only option
try:
ocr_text = model_inference.process_image_ocr.remote(img_bytes)
except Exception as e:
return f"Inference error: {e}", "Inference error.", None
# Detection (Still on Modal to save local resources)
scene_data = None
try:
scene_data = model_inference.detect_objects.remote(img_bytes)
except Exception as e:
return ocr_text, f"Inference error: {e}", None
# Format Scene Description
scene_desc = "Objects detected:\n"
if scene_data and "objects" in scene_data:
for obj in scene_data["objects"]:
scene_desc += f"- {obj['label']} ({int(obj['score']*100)}%)\n"
else:
scene_desc += "No objects detected."
# Text Simplification (Still on Modal)
simplified_text = "No text to simplify."
if ocr_text and len(ocr_text) > 5 and model_inference:
try:
simplified_text = model_inference.simplify_text.remote(ocr_text)
except Exception as e:
simplified_text = f"Simplification error: {e}"
# TTS
audio_path = None
if simplified_text and simplified_text != "No text to simplify.":
audio = text_to_speech(simplified_text)
if audio:
# Save audio to file for Gradio
audio_path = "output_tts.mp3"
with open(audio_path, "wb") as f:
f.write(audio)
return ocr_text, simplified_text, scene_desc, audio_path
def process_audio(audio_path):
if not audio_path:
return "No audio input.", ""
# Local Whisper (using openai-whisper package if installed, or API)
# For this demo, we'll simulate or use a small model if available.
# To keep it simple and fast, let's assume we use the Modal backend or a placeholder
# But user asked for "Whisper model (tiny or base) locally"
try:
import whisper
model = whisper.load_model("tiny")
result = model.transcribe(audio_path)
text = result["text"]
# Sound Classification Simulation (mapping text keywords to sounds for demo)
# In a real app, we'd run a classifier on the audio waveform.
haptic_feedback = "No specific sound pattern detected."
text_lower = text.lower()
detected_sound = None
if "bark" in text_lower: detected_sound = "dog_bark"
elif "horn" in text_lower: detected_sound = "car_horn"
elif "alarm" in text_lower: detected_sound = "alarm"
elif "door" in text_lower: detected_sound = "doorbell"
if detected_sound:
pattern = map_sound_to_vibration(detected_sound)
haptic_feedback = f"Vibration: {pattern['pattern'].upper()} (Intensity: {pattern['intensity']})"
return text, haptic_feedback
except ImportError:
return "Whisper not installed.", "Install openai-whisper to enable STT."
except Exception as e:
return f"Error: {str(e)}", ""
# Hearing Assistant Streaming Logic
async def process_hearing_stream(audio_chunk, state_text, audio_buffer_state, emotion_text, speaker_text):
if audio_chunk is None:
return state_text or "", audio_buffer_state, emotion_text or "Neutral", speaker_text or "Unknown"
rate, y = audio_chunk
print(f"📥 Received audio chunk: rate={rate}, samples={len(y)}")
# Initialize buffer if None
if audio_buffer_state is None:
audio_buffer_state = {"audio": np.array([], dtype=y.dtype), "rate": rate}
print("🔄 Initialized audio buffer")
# Append new chunk
audio_buffer_state["audio"] = np.concatenate((audio_buffer_state["audio"], y))
# Check duration
duration = len(audio_buffer_state["audio"]) / rate
print(f"⏱️ Buffer duration: {duration:.2f}s")
if duration >= 2.0: # Process every 2 seconds
import io
import scipy.io.wavfile as wav
print("🎤 Processing 2-second buffer...")
# Convert numpy array to wav bytes
byte_io = io.BytesIO()
wav.write(byte_io, rate, audio_buffer_state["audio"])
audio_bytes = byte_io.getvalue()
try:
# Parallel execution using ThreadPoolExecutor
# This runs the synchronous .remote() calls in separate threads to reduce total latency
import concurrent.futures
new_text = None
new_emotion = None
new_speaker = None
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit all tasks
future_text = executor.submit(hearing_model.transcribe_audio.remote, audio_bytes) if hearing_model else None
future_emotion = executor.submit(emotion_model.predict_emotion.remote, audio_bytes) if emotion_model else None
future_speaker = executor.submit(diarization_model.diarize_chunk.remote, audio_bytes) if diarization_model else None
# Collect results (non-blocking for other tasks)
if future_text:
try:
print("📝 Calling transcription...")
new_text = future_text.result()
except Exception as e:
print(f"❌ Transcription Error: {e}")
if future_emotion:
try:
print("😊 Calling emotion detection...")
new_emotion = future_emotion.result()
except Exception as e:
print(f"❌ Emotion Error: {e}")
if future_speaker:
try:
print("👥 Calling speaker diarization...")
new_speaker = future_speaker.result()
except Exception as e:
print(f"❌ Speaker Error: {e}")
# Handle Transcription
if isinstance(new_text, str) and new_text:
print(f"✅ Transcription: {new_text}")
state_text = (state_text or "") + " " + new_text
elif isinstance(new_text, Exception):
print(f"❌ Transcription Error: {new_text}")
# Handle Emotion
if isinstance(new_emotion, str) and new_emotion:
print(f"✅ Emotion: {new_emotion}")
emotion_text = new_emotion
elif isinstance(new_emotion, Exception):
print(f"❌ Emotion Error: {new_emotion}")
# Handle Speaker
if isinstance(new_speaker, str) and new_speaker:
print(f"✅ Speaker: {new_speaker}")
speaker_text = new_speaker
elif isinstance(new_speaker, Exception):
print(f"❌ Speaker Error: {new_speaker}")
except Exception as e:
print(f"❌ Hearing/Emotion/Diarization Error: {e}")
import traceback
traceback.print_exc()
# Clear buffer after processing
audio_buffer_state["audio"] = np.array([], dtype=y.dtype)
print("🧹 Buffer cleared")
return state_text, audio_buffer_state, emotion_text, speaker_text
# MCP Wrappers
def run_mcp_tool(tool_name):
if tool_name == "Calendar":
return mcp.get_calendar_events()
elif tool_name == "Email":
return mcp.summarize_emails()
elif tool_name == "Maps":
return mcp.navigate_maps("Home")
return "Unknown tool"
# Custom CSS removed to prevent UI conflicts
custom_css = None
def main():
with gr.Blocks(theme=gr.themes.Soft(), css=custom_css, title="Access Companion") as demo:
gr.Markdown("# ♿ Universal Accessibility Companion")
with gr.Tabs():
# --- Vision Tab ---
# --- Vision Tab ---
with gr.Tab("👁️ Vision Assistant"):
gr.Markdown("### Capture scene or document")
with gr.Row():
with gr.Column():
with gr.Tabs():
with gr.Tab("📷 Camera"):
gr.Markdown("1. Click the **Camera Icon** to start video.\n2. Click the **Circle/Stop Icon** to snap photo.\n3. Click **Process**.")
camera_input = gr.Image(
type="pil",
sources=["webcam"],
label="Take Photo"
)
cam_btn = gr.Button("📸 Process Camera Image", variant="primary")
with gr.Tab("📁 Upload"):
upload_input = gr.Image(type="pil", sources=["upload"], label="Upload Image")
upload_btn = gr.Button("🚀 Process Uploaded Image", variant="primary")
with gr.Column():
raw_ocr_output = gr.Textbox(label="Raw OCR (Debug)", lines=2)
ocr_output = gr.Textbox(label="Simplified Text (Voice Output)", lines=4)
scene_output = gr.Textbox(label="Scene Objects", lines=4)
audio_output = gr.Audio(label="TTS Output", type="filepath")
# Helper to handle dual inputs
def process_vision_wrapper(cam_img, upload_img):
# Prioritize camera if available (or whichever triggered the event)
# In this setup, we can just check which one is not None
# But since we have separate buttons, we can bind them specifically.
# However, to keep the function simple, we'll just check both.
# Note: Gradio might pass None for the inactive one.
img = cam_img if cam_img is not None else upload_img
return process_vision(img)
# Bind buttons
# We pass both inputs to the wrapper, and it picks the valid one.
# Note: This assumes the user clears one before using the other,
# or we rely on the button click context.
# Actually, simpler: Bind each button to pass ONLY its relevant input
# and None for the other, or just update process_vision to take one arg
# and bind both buttons to it directly?
# Yes! process_vision takes 'image'. We can bind different inputs to it.
cam_btn.click(
process_vision,
inputs=[camera_input],
outputs=[raw_ocr_output, ocr_output, scene_output, audio_output]
)
upload_btn.click(
process_vision,
inputs=[upload_input],
outputs=[raw_ocr_output, ocr_output, scene_output, audio_output]
)
# --- Hearing Tab ---
with gr.Tab("🗣️ Speech Impaired Assistant"):
gr.Markdown("### Real-time Captioning")
gr.Markdown("Speak into your microphone. Captions will appear below.")
with gr.Row():
# Explicitly set type="numpy" to avoid file processing errors
audio_stream = gr.Audio(sources=["microphone"], type="numpy", streaming=True, label="Microphone Input")
with gr.Column():
caption_output = gr.Textbox(label="Live Captions", lines=6, placeholder="Listening...")
emotion_output = gr.Textbox(label="Detected Emotion", lines=1, placeholder="Neutral")
speaker_output = gr.Textbox(label="Current Speaker", lines=1, placeholder="Unknown")
audio_buffer = gr.State(None) # Buffer state
caption_state = gr.State("") # Caption state
emotion_state = gr.State("Neutral") # Emotion state
speaker_state = gr.State("Unknown") # Speaker state
# Streaming event - only audio_stream changes trigger the callback
audio_stream.stream(
process_hearing_stream,
inputs=[audio_stream, caption_state, audio_buffer, emotion_state, speaker_state],
outputs=[caption_output, audio_buffer, emotion_output, speaker_output],
show_progress="hidden"
)
# --- Integrations Tab ---
with gr.Tab("🔗 Integrations (MCP)"):
gr.Markdown("### Connected Services")
with gr.Row():
cal_btn = gr.Button("📅 Check Calendar")
email_btn = gr.Button("📧 Read Emails")
maps_btn = gr.Button("🗺️ Navigate Home")
tool_output = gr.Textbox(label="System Response", lines=5)
cal_btn.click(lambda: run_mcp_tool("Calendar"), outputs=tool_output)
email_btn.click(lambda: run_mcp_tool("Email"), outputs=tool_output)
maps_btn.click(lambda: run_mcp_tool("Maps"), outputs=tool_output)
demo.launch(server_name="0.0.0.0", server_port=7860)
if __name__ == "__main__":
main()