import os
import re
import asyncio
import logging
import time
import gc
from pathlib import Path
from telethon import TelegramClient, events
from telethon.tl.types import MessageMediaDocument, InputDocumentFileLocation
from telethon.tl.functions.upload import GetFileRequest
from telethon.crypto import AES
from telethon.errors import FloodWaitError
import aiofiles
from concurrent.futures import ThreadPoolExecutor
# Optimize garbage collection for large file operations
gc.set_threshold(700, 10, 10)
# Set environment variables for better performance
os.environ['PYTHONUNBUFFERED'] = '1'
os.environ['PYTHONDONTWRITEBYTECODE'] = '1'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
TELEGRAM_API_ID = int(os.getenv("TELEGRAM_API_ID"))
TELEGRAM_API_HASH = os.getenv("TELEGRAM_API_HASH")
TELEGRAM_SESSION_NAME = os.path.join('session', os.getenv('TELEGRAM_SESSION_NAME', 'bot_session'))
TELEGRAM_GROUP_ID = int(os.getenv("GROUP_CHAT_ID"))
TOPIC_IDS = {
'Doc 1': 137,
}
TOPIC_ID_TO_CATEGORY = {
137: 'doc 1',
}
CATEGORY_TO_DIRECTORY = {
'doc 1': '/mnt/disco1/test',
}
class FastTelegramDownloader:
def __init__(self, client, max_concurrent_downloads=4):
self.client = client
self.max_concurrent_downloads = max_concurrent_downloads
self.semaphore = asyncio.Semaphore(max_concurrent_downloads)
async def download_file_fast(self, message, dest_path, chunk_size=1024*1024, progress_callback=None):
"""
Fast download using multiple concurrent connections for large files
"""
document = message.media.document
file_size = document.size
# For smaller files, use standard download
if file_size < 10 * 1024 * 1024: # Less than 10MB
return await self._standard_download(message, dest_path, progress_callback)
# Create input location for the file
input_location = InputDocumentFileLocation(
id=document.id,
access_hash=document.access_hash,
file_reference=document.file_reference,
thumb_size=""
)
# Calculate number of chunks and their sizes
chunks = []
offset = 0
chunk_id = 0
while offset < file_size:
chunk_end = min(offset + chunk_size, file_size)
chunks.append({
'id': chunk_id,
'offset': offset,
'limit': chunk_end - offset
})
offset = chunk_end
chunk_id += 1
logging.info(f"📦 Dividiendo archivo en {len(chunks)} chunks de ~{chunk_size//1024}KB")
# Download chunks concurrently
chunk_data = {}
downloaded_bytes = 0
start_time = time.time()
async def download_chunk(chunk):
async with self.semaphore:
try:
result = await self.client(GetFileRequest(
location=input_location,
offset=chunk['offset'],
limit=chunk['limit']
))
# Update progress
nonlocal downloaded_bytes
downloaded_bytes += len(result.bytes)
if progress_callback:
progress_callback(downloaded_bytes, file_size)
return chunk['id'], result.bytes
except Exception as e:
logging.error(f"Error downloading chunk {chunk['id']}: {e}")
return chunk['id'], None
try:
# Execute downloads concurrently
tasks = [download_chunk(chunk) for chunk in chunks]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Collect successful chunks
for result in results:
if isinstance(result, tuple) and result[1] is not None:
chunk_id, data = result
chunk_data[chunk_id] = data
# Verify all chunks downloaded successfully
if len(chunk_data) != len(chunks):
logging.warning(f"Some chunks failed, falling back to standard download")
return await self._standard_download(message, dest_path, progress_callback)
# Write file in correct order
async with aiofiles.open(dest_path, 'wb') as f:
for i in range(len(chunks)):
if i in chunk_data:
await f.write(chunk_data[i])
else:
raise Exception(f"Missing chunk {i}")
end_time = time.time()
duration = end_time - start_time
speed = (file_size / 1024 / 1024) / duration if duration > 0 else 0
logging.info(f"✅ Fast download completed: {dest_path} - Speed: {speed:.2f} MB/s")
return dest_path
except Exception as e:
logging.error(f"Fast download failed: {e}")
return await self._standard_download(message, dest_path, progress_callback)
async def _standard_download(self, message, dest_path, progress_callback=None):
"""Fallback to standard download method"""
document = message.media.document
file_size = document.size
# Optimize chunk size based on file size
if file_size > 100 * 1024 * 1024: # >100MB
part_size_kb = 1024 # 1MB chunks
elif file_size > 50 * 1024 * 1024: # >50MB
part_size_kb = 1024 # 1MB chunks
elif file_size > 10 * 1024 * 1024: # >10MB
part_size_kb = 512 # 512KB chunks
else:
part_size_kb = 256 # 256KB chunks
start_time = time.time()
await self.client.download_file(
document,
file=dest_path,
part_size_kb=part_size_kb,
file_size=file_size,
progress_callback=progress_callback
)
end_time = time.time()
duration = end_time - start_time
speed = (file_size / 1024 / 1024) / duration if duration > 0 else 0
logging.info(f"📊 Standard download speed: {speed:.2f} MB/s")
return dest_path
class MultiClientDownloader:
def __init__(self, api_id, api_hash, session_base_name, num_clients=3):
self.api_id = api_id
self.api_hash = api_hash
self.session_base_name = session_base_name
self.num_clients = num_clients
self.clients = []
self.client_index = 0
self.fast_downloaders = []
async def initialize_clients(self):
"""Initialize multiple client instances"""
for i in range(self.num_clients):
session_name = f"{self.session_base_name}_{i}"
client = TelegramClient(
session_name,
self.api_id,
self.api_hash,
connection_retries=3,
auto_reconnect=True,
timeout=300,
request_retries=3,
flood_sleep_threshold=60,
system_version="4.16.30-vxCUSTOM",
device_model="HighSpeedDownloader",
lang_code="es",
system_lang_code="es",
use_ipv6=False
)
await client.start()
self.clients.append(client)
self.fast_downloaders.append(FastTelegramDownloader(client, max_concurrent_downloads=2))
logging.info(f"✅ Cliente {i+1}/{self.num_clients} inicializado")
def get_next_client(self):
"""Get next client using round-robin"""
client = self.clients[self.client_index]
downloader = self.fast_downloaders[self.client_index]
self.client_index = (self.client_index + 1) % self.num_clients
return client, downloader
async def close_all_clients(self):
"""Clean shutdown of all clients"""
for client in self.clients:
await client.disconnect()
class TelegramDownloader:
def __init__(self, multi_client_downloader):
self.multi_client = multi_client_downloader
self.downloaded_files = set()
self.load_downloaded_files()
self.current_download = None
self.download_stats = {
'total_files': 0,
'total_bytes': 0,
'total_time': 0
}
def _create_download_progress_logger(self, filename):
"""Progress logger with reduced frequency"""
start_time = time.time()
last_logged_time = start_time
last_percent_reported = -5
MIN_STEP = 10 # Report every 10%
MIN_INTERVAL = 5 # Or every 5 seconds
def progress_bar_function(done_bytes, total_bytes):
nonlocal last_logged_time, last_percent_reported
current_time = time.time()
percent_now = int((done_bytes / total_bytes) * 100)
if (percent_now - last_percent_reported < MIN_STEP and
current_time - last_logged_time < MIN_INTERVAL):
return
last_percent_reported = percent_now
last_logged_time = current_time
speed = done_bytes / 1024 / 1024 / (current_time - start_time or 1)
msg = (f"⏬ {filename} | "
f"{percent_now}% | "
f"{speed:.1f} MB/s | "
f"{done_bytes/1024/1024:.1f}/{total_bytes/1024/1024:.1f} MB")
logging.info(msg)
return progress_bar_function
async def _process_download(self, message, metadata, filename, dest_path):
try:
self.current_download = filename
logging.info(f"🚀 Iniciando descarga de: {filename}")
progress_logger = self._create_download_progress_logger(filename)
temp_path = dest_path.with_name(f"temp_{metadata['file_name_telegram']}")
# Get next available client and downloader
client, fast_downloader = self.multi_client.get_next_client()
file_size = message.media.document.size
start_time = time.time()
try:
# Try fast download first for large files
if file_size > 20 * 1024 * 1024: # Files larger than 20MB
logging.info(f"📦 Usando descarga rápida para archivo de {file_size/1024/1024:.1f}MB")
await fast_downloader.download_file_fast(
message, temp_path, progress_callback=progress_logger
)
else:
# Use standard optimized download for smaller files
await fast_downloader._standard_download(
message, temp_path, progress_callback=progress_logger
)
except Exception as download_error:
logging.warning(f"Descarga optimizada falló, usando método estándar: {download_error}")
# Final fallback to basic download
await client.download_file(
message.media.document,
file=temp_path,
part_size_kb=512,
file_size=file_size,
progress_callback=progress_logger
)
if not temp_path.exists():
raise FileNotFoundError("No se encontró el archivo descargado")
# Atomic rename
temp_path.rename(dest_path)
# Update statistics
end_time = time.time()
duration = end_time - start_time
speed = (file_size / 1024 / 1024) / duration if duration > 0 else 0
self.download_stats['total_files'] += 1
self.download_stats['total_bytes'] += file_size
self.download_stats['total_time'] += duration
avg_speed = (self.download_stats['total_bytes'] / 1024 / 1024) / self.download_stats['total_time'] if self.download_stats['total_time'] > 0 else 0
logging.info(f"✅ Descarga completada: {dest_path}")
logging.info(f"📊 Velocidad: {speed:.2f} MB/s | Promedio sesión: {avg_speed:.2f} MB/s")
self.save_downloaded_file(str(message.id))
except Exception as e:
logging.error(f"❌ Error en descarga: {str(e)}", exc_info=True)
# Cleanup on error
for path_var in ['temp_path', 'dest_path']:
if path_var in locals():
path = locals()[path_var]
if hasattr(path, 'exists') and path.exists():
try:
path.unlink()
except:
pass
raise
finally:
self.current_download = None
def load_downloaded_files(self):
try:
if os.path.exists('/app/data/downloaded.log'):
with open('/app/data/downloaded.log', 'r', encoding='utf-8') as f:
self.downloaded_files = set(line.strip() for line in f if line.strip())
logging.info(f"📋 Cargados {len(self.downloaded_files)} archivos ya descargados")
except Exception as e:
logging.error(f"Error cargando archivos descargados: {str(e)}")
def save_downloaded_file(self, file_id):
try:
with open('/app/data/downloaded.log', 'a', encoding='utf-8') as f:
f.write(f"{file_id}\n")
self.downloaded_files.add(file_id)
except Exception as e:
logging.error(f"Error guardando archivo descargado: {str(e)}")
def parse_metadata(self, caption):
metadata = {}
try:
if not caption:
logging.debug(f"📂 No hay caption")
return None
pattern = r'^(\w[\w\s]*):\s*(.*?)(?=\n\w|\Z)'
matches = re.findall(pattern, caption, re.MULTILINE)
for key, value in matches:
key = key.strip().lower().replace(' ', '_')
metadata[key] = value.strip()
required_fields = [
'type', 'tmdb_id', 'file_name_telegram',
'file_name', 'folder_name', 'season_folder'
]
if not all(field in metadata for field in required_fields):
return None
if 'season' in metadata:
metadata['season'] = int(metadata['season'])
if 'episode' in metadata:
metadata['episode'] = int(metadata['episode'])
return metadata
except Exception as e:
logging.error(f"Error parseando metadata: {str(e)}")
return None
def get_destination_path(self, message, metadata):
try:
topic_id = message.reply_to.reply_to_msg_id if message.reply_to else None
if not topic_id:
logging.warning("No se pudo determinar el topic ID del mensaje")
return None
category = TOPIC_ID_TO_CATEGORY.get(topic_id)
if not category:
logging.warning(f"No se encontró categoría para el topic ID: {topic_id}")
return None
base_dir = CATEGORY_TO_DIRECTORY.get(category)
if not base_dir:
logging.warning(f"No hay directorio configurado para la categoría: {category}")
return None
filename = metadata.get('file_name')
if not filename:
logging.warning("Campo 'file_name' no encontrado en metadatos")
return None
if metadata['type'] == 'movie':
folder_name = f"{metadata['folder_name']}"
dest_dir = Path(base_dir) / folder_name
return dest_dir / filename
elif metadata['type'] == 'tv':
folder_name = f"{metadata['folder_name']}"
season_folder = metadata.get('season_folder', 'Season 01')
dest_dir = Path(base_dir) / folder_name / season_folder
return dest_dir / filename
else:
logging.warning(f"Tipo de contenido no soportado: {metadata['type']}")
return None
except Exception as e:
logging.error(f"Error determinando ruta de destino: {str(e)}")
return None
async def download_file(self, message):
try:
await asyncio.sleep(1) # Reduced delay
if not isinstance(message.media, MessageMediaDocument):
return
if str(message.id) in self.downloaded_files:
logging.debug(f"Archivo ya descargado (msg_id: {message.id})")
return
metadata = self.parse_metadata(message.message)
if not metadata:
logging.warning("No se pudieron extraer metadatos válidos")
return
if 'file_name' not in metadata or not metadata['file_name']:
logging.warning("El campo 'file_name' es obligatorio en los metadatos")
return
dest_path = self.get_destination_path(message, metadata)
if not dest_path:
return
dest_path.parent.mkdir(parents=True, exist_ok=True)
if dest_path.exists():
logging.info(f"Archivo ya existe: {dest_path}")
self.save_downloaded_file(str(message.id))
return
await self._process_download(message, metadata, metadata['file_name'], dest_path)
except Exception as e:
logging.error(f"Error descargando archivo: {str(e)}", exc_info=True)
async def process_topic(self, topic_id, limit=None):
try:
logging.info(f"📂 Procesando topic ID: {topic_id}")
# Use first client for message iteration
client = self.multi_client.clients[0]
async for message in client.iter_messages(
TELEGRAM_GROUP_ID,
limit=limit,
reply_to=topic_id,
wait_time=10 # Reduced wait time
):
try:
if message.media and isinstance(message.media, MessageMediaDocument):
await self.download_file(message)
# Small delay between downloads to prevent rate limiting
await asyncio.sleep(0.5)
except FloodWaitError as e:
wait_time = e.seconds + 5
logging.warning(f"⚠️ Flood wait detectado. Esperando {wait_time} segundos...")
await asyncio.sleep(wait_time)
continue
except Exception as e:
logging.error(f"Error procesando mensaje: {str(e)}", exc_info=True)
continue
except Exception as e:
logging.error(f"Error procesando topic {topic_id}: {str(e)}", exc_info=True)
async def process_all_topics(self):
for topic_name, topic_id in TOPIC_IDS.items():
logging.info(f"🎯 Iniciando procesamiento de: {topic_name}")
await self.process_topic(topic_id)
# Print session statistics
if self.download_stats['total_files'] > 0:
avg_speed = (self.download_stats['total_bytes'] / 1024 / 1024) / self.download_stats['total_time']
logging.info(f"📊 Estadísticas del topic {topic_name}:")
logging.info(f" 📁 Archivos: {self.download_stats['total_files']}")
logging.info(f" 💾 Total: {self.download_stats['total_bytes']/1024/1024/1024:.2f} GB")
logging.info(f" ⚡ Velocidad promedio: {avg_speed:.2f} MB/s")
async def main():
try:
# Test cryptg availability
test_data = os.urandom(1024)
key = os.urandom(32)
iv = os.urandom(32)
encrypted = AES.encrypt_ige(test_data, key, iv)
decrypted = AES.decrypt_ige(encrypted, key, iv)
if decrypted != test_data:
raise RuntimeError("❌ Cryptg does not work properly")
logging.info("✅ cryptg available and working")
except Exception as e:
logging.critical(f"❌ ERROR ON CRYPTG: {str(e)}")
raise SystemExit(1)
# Ensure session directory exists
os.makedirs('session', exist_ok=True)
os.makedirs('/app/data', exist_ok=True)
# Initialize multi-client downloader
multi_client = MultiClientDownloader(
TELEGRAM_API_ID,
TELEGRAM_API_HASH,
TELEGRAM_SESSION_NAME,
num_clients=3 # Use 3 clients for better speed
)
try:
logging.info("🚀 Inicializando clientes múltiples...")
await multi_client.initialize_clients()
downloader = TelegramDownloader(multi_client)
logging.info("📥 Iniciando descarga de todos los topics...")
await downloader.process_all_topics()
logging.info("✅ Proceso completado exitosamente")
except Exception as e:
logging.error(f"Error en main: {str(e)}", exc_info=True)
finally:
logging.info("🔌 Cerrando conexiones...")
await multi_client.close_all_clients()
if __name__ == "__main__":
asyncio.run(main())