import { storage } from "./storage"; import type { InsertTranscription } from "@shared/schema"; import path from "path"; import fs from "fs"; import { spawn } from "child_process"; import axios from "axios"; import FormData from "form-data"; import { getRTMPUrl, isStreamActive, isRTMPServerRunning } from "./rtmp-server"; const TEMP_DIR = "uploads/transcription/"; const MAX_TEMP_FILE_AGE = 1000 * 60 * 5; // 5 minutes in milliseconds // Track active transcription intervals by sessionId export const transcriptionIntervals = new Map(); async function processStreamClip(sessionId: number): Promise { let tempFilePath: string | null = null; try { console.log(`๐ŸŽฌ Starting stream clip processing for session ${sessionId}`); const processStartTime = Date.now(); // Get transcription mode from settings const settings = await storage.getSettings(); const mode = settings?.transcriptionMode || "test"; console.log(`๐Ÿ”„ Running in ${mode} mode for session ${sessionId}`); let audioSource: string; if (mode === "test") { // Test mode: Use local file audioSource = path.join(process.cwd(), "uploads", "3.mp3"); if (!fs.existsSync(audioSource)) { throw new Error("Test file '3.mp3' not found in uploads folder"); } console.log("๐Ÿ“ Using test file:", audioSource); } else { // Live mode: Use RTMP stream audioSource = `rtmp://confapp.co:1935/live/session_${sessionId}`; console.log("๐ŸŽฅ Using RTMP stream:", audioSource); } // Create session-specific directory const sessionDir = path.join(TEMP_DIR, `session_${sessionId}`); if (!fs.existsSync(sessionDir)) { fs.mkdirSync(sessionDir, { recursive: true }); } // Convert the audio to WAV with correct parameters const timestamp = Date.now(); const uniqueFileName = `clip_${timestamp}.wav`; tempFilePath = path.join(sessionDir, uniqueFileName); console.log(`๐ŸŽ™ Converting audio to WAV format for session ${sessionId}...`); // Create a promise to handle the FFmpeg process await new Promise((resolve, reject) => { const ffmpegArgs = [ '-re', '-probesize', '32', '-analyzeduration', '0', '-i', audioSource, '-t', '20', '-bufsize', '64k', '-acodec', 'pcm_s16le', '-ac', '1', '-ar', '16000', '-y', tempFilePath ]; console.log('Executing FFmpeg with args:', ffmpegArgs.join(' ')); const ffmpeg = spawn('ffmpeg', ffmpegArgs); let stdoutData = ''; let stderrData = ''; ffmpeg.stdout.on('data', (data) => { stdoutData += data.toString(); }); ffmpeg.stderr.on('data', (data) => { stderrData += data.toString(); // Log progress but don't overwhelm the console if (stderrData.includes('time=')) { console.log('FFmpeg progress:', data.toString().trim()); } }); ffmpeg.on('close', (code) => { if (code === 0 && fs.existsSync(tempFilePath) && fs.statSync(tempFilePath).size > 0) { console.log('โœ… FFmpeg completed successfully'); resolve(true); } else { console.error('FFmpeg failed with code:', code); console.error('FFmpeg stderr:', stderrData); reject(new Error(`FFmpeg failed with code ${code}`)); } }); ffmpeg.on('error', (err) => { console.error('FFmpeg spawn error:', err); reject(err); }); // Set a timeout to kill the process if it takes too long setTimeout(() => { ffmpeg.kill(); reject(new Error('FFmpeg process timed out after 25 seconds')); }, 25000); }); // Get transcription from OpenAI Whisper const transcript = await transcribeWithOpenAI(tempFilePath); if (transcript) { // Translate to English with OpenAI const translation = await translateWithOpenAI(transcript); // Save to database const dbStartTime = Date.now(); const transcriptionData: InsertTranscription = { sessionId, sourceLanguage: "Arabic", originalText: transcript, translatedText: translation, clipStartTime: new Date(Date.now() - 20000), // 20 seconds ago clipEndTime: new Date(), status: "pending" }; await storage.createTranscription(transcriptionData); console.log(`โฑ Database save time: ${Date.now() - dbStartTime}ms`); } const totalProcessingTime = Date.now() - processStartTime; console.log(`โฑ Total processing time for session ${sessionId}: ${totalProcessingTime}ms`); } catch (error) { console.error(`โŒ Error processing stream clip for session ${sessionId}:`, error); throw error; } finally { // Cleanup temp file if (tempFilePath && fs.existsSync(tempFilePath)) { try { fs.unlinkSync(tempFilePath); console.log(`๐Ÿงน Cleaned up temp file for session ${sessionId}: ${tempFilePath}`); } catch (cleanupError) { console.error(`Failed to clean up temp file ${tempFilePath}:`, cleanupError); } } } } // Ensure temp directory exists if (!fs.existsSync(TEMP_DIR)) { fs.mkdirSync(TEMP_DIR, { recursive: true }); } // Clean up old temporary files function cleanupOldTempFiles() { try { const files = fs.readdirSync(TEMP_DIR); const now = Date.now(); files.forEach(file => { const filePath = path.join(TEMP_DIR, file); const stats = fs.statSync(filePath); const fileAge = now - stats.mtimeMs; if (fileAge > MAX_TEMP_FILE_AGE) { fs.unlinkSync(filePath); console.log(`๐Ÿงน Cleaned up old temp file: ${file}`); } }); } catch (error) { console.error("Error cleaning up temp files:", error); } } // Run cleanup periodically setInterval(cleanupOldTempFiles, MAX_TEMP_FILE_AGE); async function transcribeWithOpenAI(audioFilePath: string): Promise { try { console.log("๐ŸŽ™ Starting OpenAI Whisper transcription"); const startTime = Date.now(); const audioData = fs.createReadStream(audioFilePath); const formData = new FormData(); formData.append("file", audioData, { filename: "audio.wav", contentType: "audio/wav", }); formData.append("model", "whisper-1"); formData.append("language", "ar"); const response = await axios.post( "https://api.openai.com/v1/audio/transcriptions", formData, { headers: { ...formData.getHeaders(), Authorization: `Bearer ${process.env.OPENAI_API_KEY}`, }, } ); const endTime = Date.now(); console.log(`โฑ OpenAI Whisper API response time: ${endTime - startTime}ms`); if (!response.data.text) { throw new Error("No transcription text in response"); } console.log("โœ… Received transcript from OpenAI Whisper"); return response.data.text; } catch (error: any) { console.error( "โŒ OpenAI Whisper API Error:", error instanceof Error ? error.message : "Unknown error", error.response?.data ); throw error; } } async function translateWithOpenAI(text: string): Promise { try { console.log("๐Ÿค– Starting OpenAI translation"); const startTime = Date.now(); const response = await axios.post( "https://api.openai.com/v1/chat/completions", { model: "gpt-4", messages: [ { role: "system", content: "You are a professional translator. Translate the following Arabic text to English, maintaining the original meaning and context.", }, { role: "user", content: text, }, ], }, { headers: { Authorization: `Bearer ${process.env.OPENAI_API_KEY}`, "Content-Type": "application/json", }, } ); const endTime = Date.now(); console.log(`โฑ OpenAI API response time: ${endTime - startTime}ms`); const translation = response.data.choices[0].message.content; console.log("โœ… Received translation from OpenAI"); return translation; } catch (error: any) { console.error( "โŒ OpenAI API Error:", error instanceof Error ? error.message : "Unknown error", error.response?.data ); throw error; } } export function startTranscriptionService(sessionId: number): void { console.log(`๐Ÿš€ Starting transcription service for session: ${sessionId}`); // Stop existing transcription for this session if it exists if (transcriptionIntervals.has(sessionId)) { stopTranscriptionService(sessionId); } // Process immediately and then set up interval processStreamClip(sessionId).catch(error => { console.error(`Failed to process stream clip for session ${sessionId}:`, error); }); // Set up interval for continuous processing (every 20 seconds) const interval = setInterval(() => { processStreamClip(sessionId).catch(error => { console.error(`Failed to process stream clip for session ${sessionId}:`, error); }); }, 20000); transcriptionIntervals.set(sessionId, interval); console.log(`๐Ÿ“ Transcription service started for session ${sessionId}`); } export function stopTranscriptionService(sessionId?: number): void { if (sessionId) { // Stop specific session const interval = transcriptionIntervals.get(sessionId); if (interval) { clearInterval(interval); transcriptionIntervals.delete(sessionId); console.log(`โน Transcription service stopped for session ${sessionId}`); } } else { // Stop all sessions if no sessionId provided transcriptionIntervals.forEach((interval, id) => { clearInterval(interval); console.log(`โน Transcription service stopped for session ${id}`); }); transcriptionIntervals.clear(); } }