79686777

Date: 2025-07-02 03:15:28
Score: 4.5
Natty:
Report link
import os
import re
import asyncio
import logging
import time
import gc
from pathlib import Path
from telethon import TelegramClient, events
from telethon.tl.types import MessageMediaDocument, InputDocumentFileLocation
from telethon.tl.functions.upload import GetFileRequest
from telethon.crypto import AES
from telethon.errors import FloodWaitError
import aiofiles
from concurrent.futures import ThreadPoolExecutor

# Optimize garbage collection for large file operations
gc.set_threshold(700, 10, 10)

# Set environment variables for better performance
os.environ['PYTHONUNBUFFERED'] = '1'
os.environ['PYTHONDONTWRITEBYTECODE'] = '1'

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

TELEGRAM_API_ID = int(os.getenv("TELEGRAM_API_ID"))
TELEGRAM_API_HASH = os.getenv("TELEGRAM_API_HASH")
TELEGRAM_SESSION_NAME = os.path.join('session', os.getenv('TELEGRAM_SESSION_NAME', 'bot_session'))
TELEGRAM_GROUP_ID = int(os.getenv("GROUP_CHAT_ID"))

TOPIC_IDS = {
    'Doc 1': 137,
}

TOPIC_ID_TO_CATEGORY = {
    137: 'doc 1',
}

CATEGORY_TO_DIRECTORY = {
    'doc 1': '/mnt/disco1/test',
}

class FastTelegramDownloader:
    def __init__(self, client, max_concurrent_downloads=4):
        self.client = client
        self.max_concurrent_downloads = max_concurrent_downloads
        self.semaphore = asyncio.Semaphore(max_concurrent_downloads)
    
    async def download_file_fast(self, message, dest_path, chunk_size=1024*1024, progress_callback=None):
        """
        Fast download using multiple concurrent connections for large files
        """
        document = message.media.document
        file_size = document.size
        
        # For smaller files, use standard download
        if file_size < 10 * 1024 * 1024:  # Less than 10MB
            return await self._standard_download(message, dest_path, progress_callback)
        
        # Create input location for the file
        input_location = InputDocumentFileLocation(
            id=document.id,
            access_hash=document.access_hash,
            file_reference=document.file_reference,
            thumb_size=""
        )
        
        # Calculate number of chunks and their sizes
        chunks = []
        offset = 0
        chunk_id = 0
        
        while offset < file_size:
            chunk_end = min(offset + chunk_size, file_size)
            chunks.append({
                'id': chunk_id,
                'offset': offset,
                'limit': chunk_end - offset
            })
            offset = chunk_end
            chunk_id += 1
        
        logging.info(f"📦 Dividiendo archivo en {len(chunks)} chunks de ~{chunk_size//1024}KB")
        
        # Download chunks concurrently
        chunk_data = {}
        downloaded_bytes = 0
        start_time = time.time()
        
        async def download_chunk(chunk):
            async with self.semaphore:
                try:
                    result = await self.client(GetFileRequest(
                        location=input_location,
                        offset=chunk['offset'],
                        limit=chunk['limit']
                    ))
                    
                    # Update progress
                    nonlocal downloaded_bytes
                    downloaded_bytes += len(result.bytes)
                    if progress_callback:
                        progress_callback(downloaded_bytes, file_size)
                    
                    return chunk['id'], result.bytes
                except Exception as e:
                    logging.error(f"Error downloading chunk {chunk['id']}: {e}")
                    return chunk['id'], None
        
        try:
            # Execute downloads concurrently
            tasks = [download_chunk(chunk) for chunk in chunks]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Collect successful chunks
            for result in results:
                if isinstance(result, tuple) and result[1] is not None:
                    chunk_id, data = result
                    chunk_data[chunk_id] = data
            
            # Verify all chunks downloaded successfully
            if len(chunk_data) != len(chunks):
                logging.warning(f"Some chunks failed, falling back to standard download")
                return await self._standard_download(message, dest_path, progress_callback)
            
            # Write file in correct order
            async with aiofiles.open(dest_path, 'wb') as f:
                for i in range(len(chunks)):
                    if i in chunk_data:
                        await f.write(chunk_data[i])
                    else:
                        raise Exception(f"Missing chunk {i}")
            
            end_time = time.time()
            duration = end_time - start_time
            speed = (file_size / 1024 / 1024) / duration if duration > 0 else 0
            logging.info(f"✅ Fast download completed: {dest_path} - Speed: {speed:.2f} MB/s")
            return dest_path
            
        except Exception as e:
            logging.error(f"Fast download failed: {e}")
            return await self._standard_download(message, dest_path, progress_callback)
    
    async def _standard_download(self, message, dest_path, progress_callback=None):
        """Fallback to standard download method"""
        document = message.media.document
        file_size = document.size
        
        # Optimize chunk size based on file size
        if file_size > 100 * 1024 * 1024:  # >100MB
            part_size_kb = 1024    # 1MB chunks
        elif file_size > 50 * 1024 * 1024:  # >50MB
            part_size_kb = 1024    # 1MB chunks
        elif file_size > 10 * 1024 * 1024:  # >10MB
            part_size_kb = 512     # 512KB chunks
        else:
            part_size_kb = 256     # 256KB chunks
        
        start_time = time.time()
        
        await self.client.download_file(
            document,
            file=dest_path,
            part_size_kb=part_size_kb,
            file_size=file_size,
            progress_callback=progress_callback
        )
        
        end_time = time.time()
        duration = end_time - start_time
        speed = (file_size / 1024 / 1024) / duration if duration > 0 else 0
        logging.info(f"📊 Standard download speed: {speed:.2f} MB/s")
        return dest_path

class MultiClientDownloader:
    def __init__(self, api_id, api_hash, session_base_name, num_clients=3):
        self.api_id = api_id
        self.api_hash = api_hash
        self.session_base_name = session_base_name
        self.num_clients = num_clients
        self.clients = []
        self.client_index = 0
        self.fast_downloaders = []
        
    async def initialize_clients(self):
        """Initialize multiple client instances"""
        for i in range(self.num_clients):
            session_name = f"{self.session_base_name}_{i}"
            client = TelegramClient(
                session_name,
                self.api_id,
                self.api_hash,
                connection_retries=3,
                auto_reconnect=True,
                timeout=300,
                request_retries=3,
                flood_sleep_threshold=60,
                system_version="4.16.30-vxCUSTOM",
                device_model="HighSpeedDownloader",
                lang_code="es",
                system_lang_code="es",
                use_ipv6=False
            )
            await client.start()
            self.clients.append(client)
            self.fast_downloaders.append(FastTelegramDownloader(client, max_concurrent_downloads=2))
            logging.info(f"✅ Cliente {i+1}/{self.num_clients} inicializado")
            
    def get_next_client(self):
        """Get next client using round-robin"""
        client = self.clients[self.client_index]
        downloader = self.fast_downloaders[self.client_index]
        self.client_index = (self.client_index + 1) % self.num_clients
        return client, downloader
    
    async def close_all_clients(self):
        """Clean shutdown of all clients"""
        for client in self.clients:
            await client.disconnect()

class TelegramDownloader:
    def __init__(self, multi_client_downloader):
        self.multi_client = multi_client_downloader
        self.downloaded_files = set()
        self.load_downloaded_files()
        self.current_download = None
        self.download_stats = {
            'total_files': 0,
            'total_bytes': 0,
            'total_time': 0
        }
    
    def _create_download_progress_logger(self, filename):
        """Progress logger with reduced frequency"""
        start_time = time.time()
        last_logged_time = start_time
        last_percent_reported = -5
    
        MIN_STEP = 10  # Report every 10%
        MIN_INTERVAL = 5  # Or every 5 seconds
    
        def progress_bar_function(done_bytes, total_bytes):
            nonlocal last_logged_time, last_percent_reported
    
            current_time = time.time()
            percent_now = int((done_bytes / total_bytes) * 100)
    
            if (percent_now - last_percent_reported < MIN_STEP and
                    current_time - last_logged_time < MIN_INTERVAL):
                return
    
            last_percent_reported = percent_now
            last_logged_time = current_time
    
            speed = done_bytes / 1024 / 1024 / (current_time - start_time or 1)
            msg = (f"⏬ {filename} | "
                f"{percent_now}% | "
                f"{speed:.1f} MB/s | "
                f"{done_bytes/1024/1024:.1f}/{total_bytes/1024/1024:.1f} MB")
            logging.info(msg)
            
        return progress_bar_function
    
    async def _process_download(self, message, metadata, filename, dest_path):
        try:
            self.current_download = filename
            logging.info(f"🚀 Iniciando descarga de: {filename}")
            
            progress_logger = self._create_download_progress_logger(filename)
            temp_path = dest_path.with_name(f"temp_{metadata['file_name_telegram']}")
            
            # Get next available client and downloader
            client, fast_downloader = self.multi_client.get_next_client()
            
            file_size = message.media.document.size
            start_time = time.time()
            
            try:
                # Try fast download first for large files
                if file_size > 20 * 1024 * 1024:  # Files larger than 20MB
                    logging.info(f"📦 Usando descarga rápida para archivo de {file_size/1024/1024:.1f}MB")
                    await fast_downloader.download_file_fast(
                        message, temp_path, progress_callback=progress_logger
                    )
                else:
                    # Use standard optimized download for smaller files
                    await fast_downloader._standard_download(
                        message, temp_path, progress_callback=progress_logger
                    )
                    
            except Exception as download_error:
                logging.warning(f"Descarga optimizada falló, usando método estándar: {download_error}")
                # Final fallback to basic download
                await client.download_file(
                    message.media.document,
                    file=temp_path,
                    part_size_kb=512,
                    file_size=file_size,
                    progress_callback=progress_logger
                )
            
            if not temp_path.exists():
                raise FileNotFoundError("No se encontró el archivo descargado")
            
            # Atomic rename
            temp_path.rename(dest_path)
            
            # Update statistics
            end_time = time.time()
            duration = end_time - start_time
            speed = (file_size / 1024 / 1024) / duration if duration > 0 else 0
            
            self.download_stats['total_files'] += 1
            self.download_stats['total_bytes'] += file_size
            self.download_stats['total_time'] += duration
            
            avg_speed = (self.download_stats['total_bytes'] / 1024 / 1024) / self.download_stats['total_time'] if self.download_stats['total_time'] > 0 else 0
            
            logging.info(f"✅ Descarga completada: {dest_path}")
            logging.info(f"📊 Velocidad: {speed:.2f} MB/s | Promedio sesión: {avg_speed:.2f} MB/s")
            
            self.save_downloaded_file(str(message.id))
        
        except Exception as e:
            logging.error(f"❌ Error en descarga: {str(e)}", exc_info=True)
            # Cleanup on error
            for path_var in ['temp_path', 'dest_path']:
                if path_var in locals():
                    path = locals()[path_var]
                    if hasattr(path, 'exists') and path.exists():
                        try:
                            path.unlink()
                        except:
                            pass
            raise
        finally:
            self.current_download = None
    
    def load_downloaded_files(self):
        try:
            if os.path.exists('/app/data/downloaded.log'):
                with open('/app/data/downloaded.log', 'r', encoding='utf-8') as f:
                    self.downloaded_files = set(line.strip() for line in f if line.strip())
                logging.info(f"📋 Cargados {len(self.downloaded_files)} archivos ya descargados")
        except Exception as e:
            logging.error(f"Error cargando archivos descargados: {str(e)}")
            
    def save_downloaded_file(self, file_id):
        try:
            with open('/app/data/downloaded.log', 'a', encoding='utf-8') as f:
                f.write(f"{file_id}\n")
            self.downloaded_files.add(file_id)
        except Exception as e:
            logging.error(f"Error guardando archivo descargado: {str(e)}")
    
    def parse_metadata(self, caption):
        metadata = {}
        try:
            if not caption:
                logging.debug(f"📂 No hay caption")
                return None
                
            pattern = r'^(\w[\w\s]*):\s*(.*?)(?=\n\w|\Z)'
            matches = re.findall(pattern, caption, re.MULTILINE)
            
            for key, value in matches:
                key = key.strip().lower().replace(' ', '_')
                metadata[key] = value.strip()
                
            required_fields = [
                'type', 'tmdb_id', 'file_name_telegram', 
                'file_name', 'folder_name', 'season_folder'
            ]
            if not all(field in metadata for field in required_fields):
                return None
                
            if 'season' in metadata:
                metadata['season'] = int(metadata['season'])
            if 'episode' in metadata:
                metadata['episode'] = int(metadata['episode'])
                
            return metadata
            
        except Exception as e:
            logging.error(f"Error parseando metadata: {str(e)}")
            return None
    
    def get_destination_path(self, message, metadata):
        try:
            topic_id = message.reply_to.reply_to_msg_id if message.reply_to else None
            if not topic_id:
                logging.warning("No se pudo determinar el topic ID del mensaje")
                return None
                
            category = TOPIC_ID_TO_CATEGORY.get(topic_id)
            if not category:
                logging.warning(f"No se encontró categoría para el topic ID: {topic_id}")
                return None
                
            base_dir = CATEGORY_TO_DIRECTORY.get(category)
            if not base_dir:
                logging.warning(f"No hay directorio configurado para la categoría: {category}")
                return None
                
            filename = metadata.get('file_name')
            if not filename:
                logging.warning("Campo 'file_name' no encontrado en metadatos")
                return None
                
            if metadata['type'] == 'movie':
                folder_name = f"{metadata['folder_name']}"
                dest_dir = Path(base_dir) / folder_name
                return dest_dir / filename
                
            elif metadata['type'] == 'tv':
                folder_name = f"{metadata['folder_name']}"
                season_folder = metadata.get('season_folder', 'Season 01')
                dest_dir = Path(base_dir) / folder_name / season_folder
                return dest_dir / filename
                
            else:
                logging.warning(f"Tipo de contenido no soportado: {metadata['type']}")
                return None
                
        except Exception as e:
            logging.error(f"Error determinando ruta de destino: {str(e)}")
            return None
    
    async def download_file(self, message):
        try:
            await asyncio.sleep(1)  # Reduced delay
            
            if not isinstance(message.media, MessageMediaDocument):
                return
                
            if str(message.id) in self.downloaded_files:
                logging.debug(f"Archivo ya descargado (msg_id: {message.id})")
                return
                
            metadata = self.parse_metadata(message.message)
            if not metadata:
                logging.warning("No se pudieron extraer metadatos válidos")
                return
                
            if 'file_name' not in metadata or not metadata['file_name']:
                logging.warning("El campo 'file_name' es obligatorio en los metadatos")
                return
                
            dest_path = self.get_destination_path(message, metadata)
            if not dest_path:
                return
                
            dest_path.parent.mkdir(parents=True, exist_ok=True)
            
            if dest_path.exists():
                logging.info(f"Archivo ya existe: {dest_path}")
                self.save_downloaded_file(str(message.id))
                return
                
            await self._process_download(message, metadata, metadata['file_name'], dest_path)
                
        except Exception as e:
            logging.error(f"Error descargando archivo: {str(e)}", exc_info=True)
    
    async def process_topic(self, topic_id, limit=None):
        try:
            logging.info(f"📂 Procesando topic ID: {topic_id}")
            
            # Use first client for message iteration
            client = self.multi_client.clients[0]
            
            async for message in client.iter_messages(
                TELEGRAM_GROUP_ID,
                limit=limit,
                reply_to=topic_id,
                wait_time=10  # Reduced wait time
            ):
                try:
                    if message.media and isinstance(message.media, MessageMediaDocument):
                        await self.download_file(message)
                        
                        # Small delay between downloads to prevent rate limiting
                        await asyncio.sleep(0.5)
                        
                except FloodWaitError as e:
                    wait_time = e.seconds + 5
                    logging.warning(f"⚠️ Flood wait detectado. Esperando {wait_time} segundos...")
                    await asyncio.sleep(wait_time)
                    continue
                except Exception as e:
                    logging.error(f"Error procesando mensaje: {str(e)}", exc_info=True)
                    continue
                    
        except Exception as e:
            logging.error(f"Error procesando topic {topic_id}: {str(e)}", exc_info=True)
    
    async def process_all_topics(self):
        for topic_name, topic_id in TOPIC_IDS.items():
            logging.info(f"🎯 Iniciando procesamiento de: {topic_name}")
            await self.process_topic(topic_id)
            
            # Print session statistics
            if self.download_stats['total_files'] > 0:
                avg_speed = (self.download_stats['total_bytes'] / 1024 / 1024) / self.download_stats['total_time']
                logging.info(f"📊 Estadísticas del topic {topic_name}:")
                logging.info(f"   📁 Archivos: {self.download_stats['total_files']}")
                logging.info(f"   💾 Total: {self.download_stats['total_bytes']/1024/1024/1024:.2f} GB")
                logging.info(f"   ⚡ Velocidad promedio: {avg_speed:.2f} MB/s")

async def main():
    try:
        # Test cryptg availability
        test_data = os.urandom(1024)
        key = os.urandom(32)
        iv = os.urandom(32)
        
        encrypted = AES.encrypt_ige(test_data, key, iv)
        decrypted = AES.decrypt_ige(encrypted, key, iv)
        if decrypted != test_data:
            raise RuntimeError("❌ Cryptg does not work properly")
        
        logging.info("✅ cryptg available and working")
    except Exception as e:
        logging.critical(f"❌ ERROR ON CRYPTG: {str(e)}")
        raise SystemExit(1)
    
    # Ensure session directory exists
    os.makedirs('session', exist_ok=True)
    os.makedirs('/app/data', exist_ok=True)
    
    # Initialize multi-client downloader
    multi_client = MultiClientDownloader(
        TELEGRAM_API_ID,
        TELEGRAM_API_HASH,
        TELEGRAM_SESSION_NAME,
        num_clients=3  # Use 3 clients for better speed
    )
    
    try:
        logging.info("🚀 Inicializando clientes múltiples...")
        await multi_client.initialize_clients()
        
        downloader = TelegramDownloader(multi_client)
        
        logging.info("📥 Iniciando descarga de todos los topics...")
        await downloader.process_all_topics()
        
        logging.info("✅ Proceso completado exitosamente")
        
    except Exception as e:
        logging.error(f"Error en main: {str(e)}", exc_info=True)
    finally:
        logging.info("🔌 Cerrando conexiones...")
        await multi_client.close_all_clients()

if __name__ == "__main__":
    asyncio.run(main())
Reasons:
  • Blacklisted phrase (1): está
  • RegEx Blacklisted phrase (2): encontr
  • RegEx Blacklisted phrase (2): encontrado
  • Long answer (-1):
  • Has code block (-0.5):
  • Low reputation (1):
Posted by: anoop