Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 136 additions & 29 deletions backend/tests/unit/test_sync_opus_decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
Each scenario corresponds to a real-world sticky-pending failure mode.
"""

import importlib.util
import os
import struct
import sys
import tempfile
import wave
from types import ModuleType
from unittest.mock import MagicMock, patch

import pytest
Expand Down Expand Up @@ -68,50 +70,155 @@
sys.modules[_mod] = MagicMock()


def _ensure_attr(module_name: str, attr: str):
module = sys.modules[module_name]
if not hasattr(module, attr):
setattr(module, attr, MagicMock())
def _ensure_attrs(module_name, attrs):
module = sys.modules.setdefault(module_name, MagicMock())
for attr in attrs:
if not hasattr(module, attr):
setattr(module, attr, MagicMock())
return module


for _module_name, _attrs in {
'opuslib': ['Decoder'],
'database.conversations': ['get_closest_conversation_to_timestamps', 'update_conversation_segments'],
'models.conversation': ['Conversation', 'CreateConversation'],
'models.conversation_enums': ['ConversationSource'],
'models.transcript_segment': ['TranscriptSegment'],
'utils.conversations.factory': ['deserialize_conversation'],
'utils.conversations.process_conversation': ['process_conversation'],
'utils.analytics': ['record_usage'],
'utils.byok': ['get_byok_keys', 'set_byok_keys', 'has_byok_keys'],
'utils.cloud_tasks': [
class _ConversationSource:
omi = 'omi'
limitless = 'limitless'
unknown = 'unknown'


def _ensure_conversation_source_stub():
source = getattr(sys.modules.setdefault('models.conversation_enums', MagicMock()), 'ConversationSource', None)
if source is None or not all(hasattr(source, attr) for attr in ('omi', 'limitless')):
sys.modules['models.conversation_enums'].ConversationSource = _ConversationSource

conversation_mod = sys.modules.setdefault('models.conversation', MagicMock())
if not hasattr(getattr(conversation_mod, 'ConversationSource', None), 'omi'):
conversation_mod.ConversationSource = sys.modules['models.conversation_enums'].ConversationSource


def _install_python_multipart_stub():
if 'python_multipart' in sys.modules:
return False
if importlib.util.find_spec('python_multipart') is not None:
return False

mod = ModuleType('python_multipart')
mod.__version__ = '0.0.20'
sys.modules['python_multipart'] = mod
return True


sys.modules['database.redis_db'].r = MagicMock()
sys.modules['database._client'].db = MagicMock()
_ensure_attrs('opuslib', ['Decoder'])
_ensure_attrs('database.conversations', ['get_closest_conversation_to_timestamps', 'update_conversation_segments'])
_ensure_attrs(
'database.sync_jobs',
[
'TERMINAL_STATUSES',
'create_sync_job',
'get_sync_job',
'update_sync_job',
'mark_job_processing',
'mark_job_completed',
'mark_job_failed',
'mark_job_queued_for_retry',
'try_acquire_job_run_lock',
'release_job_run_lock',
'add_processed_segment',
'get_processed_segments',
'try_mark_once',
],
)
_ensure_attrs('models.conversation', ['Conversation', 'CreateConversation'])
_ensure_conversation_source_stub()
_ensure_attrs('models.transcript_segment', ['TranscriptSegment'])
_ensure_attrs('utils.conversations.factory', ['deserialize_conversation'])
_ensure_attrs('utils.conversations.process_conversation', ['process_conversation'])
_ensure_attrs('utils.analytics', ['record_usage'])
_ensure_attrs('utils.other.endpoints', ['get_current_user_uid'])
_ensure_attrs(
'utils.other.storage',
[
'get_syncing_file_temporal_signed_url',
'delete_syncing_temporal_file',
'schedule_syncing_temporal_file_deletion',
'upload_syncing_temporal_file',
'download_syncing_temporal_file',
'download_audio_chunks_and_merge',
'get_or_create_merged_audio',
'get_merged_audio_signed_url',
'download_legacy_merged_wav',
'get_playback_artifact_signed_url',
'download_playback_artifact',
'upload_playback_artifact',
'mark_playback_unavailable',
'is_playback_unavailable',
'enqueue_conversation_audio_merge',
'_PRECACHE_FILE_SEM',
],
)
_ensure_attrs('utils.byok', ['get_byok_keys', 'set_byok_keys', 'has_byok_keys'])
_ensure_attrs(
'utils.cloud_tasks',
[
'enqueue_sync_job',
'get_sync_tasks_max_attempts',
'is_audio_merge_dispatch_enabled',
'is_cloud_tasks_dispatch_enabled',
'verify_cloud_tasks_oidc',
],
'utils.http_client': ['_get_semaphore'],
'utils.log_sanitizer': ['sanitize'],
'utils.stt.pre_recorded': ['postprocess_words', 'prerecorded'],
'utils.stt.vad': ['vad_is_empty'],
'utils.speaker_assignment': ['process_speaker_assigned_segments'],
'utils.speaker_identification': ['detect_speaker_from_text'],
'utils.stt.speaker_embedding': ['extract_embedding_from_bytes', 'compare_embeddings', 'SPEAKER_MATCH_THRESHOLD'],
'utils.subscription': ['has_transcription_credits'],
}.items():
for _attr in _attrs:
_ensure_attr(_module_name, _attr)
sys.modules['database.redis_db'].r = MagicMock()
sys.modules['database._client'].db = MagicMock()
)
_ensure_attrs('utils.http_client', ['_get_semaphore'])
_ensure_attrs(
'utils.executors',
[
'critical_executor',
'db_executor',
'postprocess_executor',
'storage_executor',
'sync_executor',
'run_blocking',
'start_background_task',
'submit_with_context',
],
)
_ensure_attrs('utils.stt.pre_recorded', ['postprocess_words', 'prerecorded'])
_ensure_attrs('utils.stt.vad', ['vad_is_empty'])
_ensure_attrs(
'utils.fair_use',
[
'record_speech_ms',
'get_rolling_speech_ms',
'check_soft_caps',
'is_hard_restricted',
'trigger_classifier_if_needed',
'is_dg_budget_exhausted',
'get_enforcement_stage',
'record_dg_usage_ms',
'FAIR_USE_ENABLED',
'FAIR_USE_RESTRICT_DAILY_DG_MS',
],
)
_ensure_attrs('utils.speaker_assignment', ['process_speaker_assigned_segments'])
_ensure_attrs('utils.speaker_identification', ['detect_speaker_from_text'])
_ensure_attrs(
'utils.stt.speaker_embedding',
['extract_embedding_from_bytes', 'compare_embeddings', 'SPEAKER_MATCH_THRESHOLD'],
)
_ensure_attrs('utils.subscription', ['has_transcription_credits'])
_ensure_attrs('pydub', ['AudioSegment'])
if 'google.cloud.tasks_v2' not in sys.modules:
sys.modules['google.cloud.tasks_v2'] = MagicMock()
if not hasattr(sys.modules.setdefault('google.cloud', MagicMock()), 'tasks_v2'):
sys.modules['google.cloud'].tasks_v2 = sys.modules['google.cloud.tasks_v2']
sys.modules['utils.log_sanitizer'].sanitize = lambda x: x
sys.modules['utils.log_sanitizer'].sanitize_pii = lambda x: x

from routers.sync import decode_opus_file_to_wav, decode_files_to_wav # noqa: E402
_remove_python_multipart_stub = _install_python_multipart_stub()
try:
from routers.sync import decode_opus_file_to_wav, decode_files_to_wav # noqa: E402
finally:
if _remove_python_multipart_stub:
sys.modules.pop('python_multipart', None)

# ---------------------------------------------------------------------------
# Helpers
Expand Down
165 changes: 136 additions & 29 deletions backend/tests/unit/test_sync_pcm_decode.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for PCM16 WAL file decode in sync.py."""

import importlib.util
import os
import struct
import sys
Expand Down Expand Up @@ -54,49 +55,155 @@
sys.modules[mod_name] = MagicMock()


def _ensure_attr(module_name: str, attr: str):
module = sys.modules[module_name]
if not hasattr(module, attr):
setattr(module, attr, MagicMock())
def _ensure_attrs(module_name, attrs):
module = sys.modules.setdefault(module_name, MagicMock())
for attr in attrs:
if not hasattr(module, attr):
setattr(module, attr, MagicMock())
return module


class _ConversationSource:
omi = 'omi'
limitless = 'limitless'
unknown = 'unknown'


def _ensure_conversation_source_stub():
source = getattr(sys.modules.setdefault('models.conversation_enums', MagicMock()), 'ConversationSource', None)
if source is None or not all(hasattr(source, attr) for attr in ('omi', 'limitless')):
sys.modules['models.conversation_enums'].ConversationSource = _ConversationSource

conversation_mod = sys.modules.setdefault('models.conversation', MagicMock())
if not hasattr(getattr(conversation_mod, 'ConversationSource', None), 'omi'):
conversation_mod.ConversationSource = sys.modules['models.conversation_enums'].ConversationSource


def _install_python_multipart_stub():
if 'python_multipart' in sys.modules:
return False
if importlib.util.find_spec('python_multipart') is not None:
return False

mod = ModuleType('python_multipart')
mod.__version__ = '0.0.20'
sys.modules['python_multipart'] = mod
return True


# Ensure specific attributes exist on key stubs
for _module_name, _attrs in {
'opuslib': ['Decoder'],
'database.conversations': ['get_closest_conversation_to_timestamps', 'update_conversation_segments'],
'models.conversation': ['Conversation', 'CreateConversation'],
'models.conversation_enums': ['ConversationSource'],
'models.transcript_segment': ['TranscriptSegment'],
'utils.conversations.factory': ['deserialize_conversation'],
'utils.conversations.process_conversation': ['process_conversation'],
'utils.analytics': ['record_usage'],
'utils.byok': ['get_byok_keys', 'set_byok_keys', 'has_byok_keys'],
'utils.cloud_tasks': [
sys.modules['database.redis_db'].r = MagicMock()
sys.modules['database._client'].db = MagicMock()
_ensure_attrs('opuslib', ['Decoder'])
_ensure_attrs('database.conversations', ['get_closest_conversation_to_timestamps', 'update_conversation_segments'])
_ensure_attrs(
'database.sync_jobs',
[
'TERMINAL_STATUSES',
'create_sync_job',
'get_sync_job',
'update_sync_job',
'mark_job_processing',
'mark_job_completed',
'mark_job_failed',
'mark_job_queued_for_retry',
'try_acquire_job_run_lock',
'release_job_run_lock',
'add_processed_segment',
'get_processed_segments',
'try_mark_once',
],
)
_ensure_attrs('models.conversation', ['Conversation', 'CreateConversation'])
_ensure_conversation_source_stub()
_ensure_attrs('models.transcript_segment', ['TranscriptSegment'])
_ensure_attrs('utils.conversations.factory', ['deserialize_conversation'])
_ensure_attrs('utils.conversations.process_conversation', ['process_conversation'])
_ensure_attrs('utils.analytics', ['record_usage'])
_ensure_attrs('utils.other.endpoints', ['get_current_user_uid'])
_ensure_attrs(
'utils.other.storage',
[
'get_syncing_file_temporal_signed_url',
'delete_syncing_temporal_file',
'schedule_syncing_temporal_file_deletion',
'upload_syncing_temporal_file',
'download_syncing_temporal_file',
'download_audio_chunks_and_merge',
'get_or_create_merged_audio',
'get_merged_audio_signed_url',
'download_legacy_merged_wav',
'get_playback_artifact_signed_url',
'download_playback_artifact',
'upload_playback_artifact',
'mark_playback_unavailable',
'is_playback_unavailable',
'enqueue_conversation_audio_merge',
'_PRECACHE_FILE_SEM',
],
)
_ensure_attrs('utils.byok', ['get_byok_keys', 'set_byok_keys', 'has_byok_keys'])
_ensure_attrs(
'utils.cloud_tasks',
[
'enqueue_sync_job',
'get_sync_tasks_max_attempts',
'is_audio_merge_dispatch_enabled',
'is_cloud_tasks_dispatch_enabled',
'verify_cloud_tasks_oidc',
],
'utils.http_client': ['_get_semaphore'],
'utils.log_sanitizer': ['sanitize'],
'utils.stt.pre_recorded': ['postprocess_words', 'prerecorded'],
'utils.stt.vad': ['vad_is_empty'],
'utils.speaker_assignment': ['process_speaker_assigned_segments'],
'utils.speaker_identification': ['detect_speaker_from_text'],
'utils.stt.speaker_embedding': ['extract_embedding_from_bytes', 'compare_embeddings', 'SPEAKER_MATCH_THRESHOLD'],
'utils.subscription': ['has_transcription_credits'],
}.items():
for _attr in _attrs:
_ensure_attr(_module_name, _attr)
sys.modules['database.redis_db'].r = MagicMock()
sys.modules['database._client'].db = MagicMock()
)
_ensure_attrs('utils.http_client', ['_get_semaphore'])
_ensure_attrs('utils.log_sanitizer', ['sanitize'])
_ensure_attrs(
'utils.executors',
[
'critical_executor',
'db_executor',
'postprocess_executor',
'storage_executor',
'sync_executor',
'run_blocking',
'start_background_task',
'submit_with_context',
],
)
_ensure_attrs('utils.stt.pre_recorded', ['postprocess_words', 'prerecorded'])
_ensure_attrs('utils.stt.vad', ['vad_is_empty'])
_ensure_attrs(
'utils.fair_use',
[
'record_speech_ms',
'get_rolling_speech_ms',
'check_soft_caps',
'is_hard_restricted',
'trigger_classifier_if_needed',
'is_dg_budget_exhausted',
'get_enforcement_stage',
'record_dg_usage_ms',
'FAIR_USE_ENABLED',
'FAIR_USE_RESTRICT_DAILY_DG_MS',
],
)
_ensure_attrs('utils.speaker_assignment', ['process_speaker_assigned_segments'])
_ensure_attrs('utils.speaker_identification', ['detect_speaker_from_text'])
_ensure_attrs(
'utils.stt.speaker_embedding',
['extract_embedding_from_bytes', 'compare_embeddings', 'SPEAKER_MATCH_THRESHOLD'],
)
_ensure_attrs('utils.subscription', ['has_transcription_credits'])
_ensure_attrs('pydub', ['AudioSegment'])
if 'google.cloud.tasks_v2' not in sys.modules:
sys.modules['google.cloud.tasks_v2'] = MagicMock()
if not hasattr(sys.modules.setdefault('google.cloud', MagicMock()), 'tasks_v2'):
sys.modules['google.cloud'].tasks_v2 = sys.modules['google.cloud.tasks_v2']

from routers.sync import _is_pcm_codec, decode_pcm_file_to_wav, decode_files_to_wav
_remove_python_multipart_stub = _install_python_multipart_stub()
try:
from routers.sync import _is_pcm_codec, decode_pcm_file_to_wav, decode_files_to_wav
finally:
if _remove_python_multipart_stub:
sys.modules.pop('python_multipart', None)


class TestIsPcmCodec:
Expand Down
Loading