You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1147 lines
47 KiB

# views.py
from django.shortcuts import render
import os
from django.http import JsonResponse
from django.conf import settings
from django.views.decorators.csrf import csrf_exempt
import json
from pathlib import Path
import shutil
import re
import subprocess
from django.db import connections
import django
from django.db.utils import OperationalError
from django.core.management import call_command
from gtts import gTTS
from pydub import AudioSegment
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Dialbox_8.settings')
django.setup()
# Helper function to make safe filenames
def get_safe_filename(name):
return re.sub(r'[^a-zA-Z0-9_-]', '_', name).strip()
# Vista para renderizar la página principal
def create_tabs_flow(request):
return render(request, 'flow.html')
# Vista para listar todas las pestañas existentes
def list_tabs(request):
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
tabs = []
if os.path.exists(data_dir):
for filename in os.listdir(data_dir):
if filename.startswith('tab_') and filename.endswith('.json'):
# Extraer tabId y tabName del nombre del archivo
match = re.match(r'tab_(\d+)_(.+)\.json', filename)
if match:
tab_id = match.group(1)
file_path = os.path.join(data_dir, filename)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
tab_name = data.get('tabName', f'Pestaña {tab_id}')
tabs.append({
'name': tab_name,
'id': tab_id
})
return JsonResponse({'tabs': tabs})
# Función para crear y guardar archivos de texto desde el contenido del node3
@csrf_exempt
def save_tts(request):
if request.method != 'POST':
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
try:
data = json.loads(request.body)
tab_id = data.get('tabId')
tab_name = data.get('tabName')
node_id = data.get('nodeId') # Agregamos nodeId
node_type = data.get('nodeType')
title = data.get('title')
content = data.get('content')
audio_file = data.get('audioFile') # Obtenemos el nombre del archivo de audio
if not audio_file:
audio_file = 'saludo_claude' # Valor por defecto si no se proporciona
# Formatear el nombre del archivo como {node_id}_{node_title}.WAV
safe_title = get_safe_filename(title)
formatted_audio_file = f"{node_id}_{safe_title}.WAV"
file_name = f'{tab_name}.py'
py_filepath = os.path.join('/var/lib/asterisk/agi-bin/', file_name)
# Escapar llaves en el contenido
file_content = f"""#!/usr/bin/env python3
import speech_recognition as sr
import re
import tts
import subprocess
import os
from asterisk.agi import AGI
import anthropic
import json
agi = AGI()
class CallHandler:
def __init__(self):
self.audio_path = "/var/lib/asterisk/sounds/grabacion_usuario.wav"
self.tab_id = self.get_tab_id()
def get_tab_id(self):
# Obtener el tab_id desde una variable de entorno AGI
# Asegúrate de pasar esta variable desde el Dialplan
tab_id = agi.env.get('TAB_ID', None)
if not tab_id:
agi.verbose("TAB_ID no fue proporcionado. Utilizando 'default_tab_id'.", 1)
return 'default_tab_id'
return tab_id
def grabar_llamada(self):
callerid = agi.env.get('agi_callerid', '')
uniqueid = agi.env.get('agi_uniqueid', '')
# Registrar la información de la llamada
agi.verbose(f"CallerID: {{callerid}}", 1)
agi.verbose(f"UniqueID: {{uniqueid}}", 1)
# Iniciar la grabación
agi.verbose("Iniciando grabación...", 1)
try:
result = agi.record_file(
filename='/var/lib/asterisk/sounds/grabacion_usuario',
format='wav',
timeout=4000, # Puedes ajustar el tiempo de espera según tus necesidades
offset=0,
beep=True,
silence=0,
escape_digits='#'
)
except Exception as e:
agi.verbose(f"Error al grabar: {{e}}", 1)
else:
agi.verbose(f"Grabación completada. Resultado: {{result}}", 1)
self.detectar_audio()
def detectar_audio(self):
recognizer = sr.Recognizer()
with sr.AudioFile(self.audio_path) as source:
audio_data = recognizer.record(source)
try:
text = recognizer.recognize_google(audio_data, language="es-ES")
agi.verbose(f"La pregunta es: {{text}}", 1)
# Llamar a la función consultar_claude con el texto reconocido
respuesta = self.consultar_claude(text)
cleaned_text = re.sub(r'^(?:\\*+|\\d+)', '', respuesta).strip()
cleaned_text = cleaned_text.replace('%', '')
tts.convert_text_to_speech(cleaned_text)
ext = self.determine_extension(respuesta)
# Reproducir la respuesta generada por Claude
try:
tts.convert_text_to_speech(cleaned_text)
agi.stream_file('resp_claude')
except Exception as e:
agi.verbose(f"Error al convertir texto a voz: {{e}}", 1)
if ext is not None and ext != '':
try:
resultado_dial = agi.execute("EXEC Dial", f"Local/{{str(ext)}}@from-internal,20")
agi.verbose(f"Resultado del Dial: {{resultado_dial}}", 1)
except Exception as e:
agi.verbose(f"Error ejecutando Dial en AGI: {{e}}", 1)
else:
agi.verbose("La variable ext no tiene un valor válido.", 1)
agi.stream_file('algomas')
# Volver a grabar si es necesario
self.grabar_llamada()
except sr.RequestError as e:
agi.verbose(f"Error al conectarse con el servicio de reconocimiento de voz: {{e}}", 1)
except sr.UnknownValueError:
agi.verbose("No se pudo entender el audio", 1)
agi.stream_file('no_entendi')
self.grabar_llamada()
def determine_extension(self, input_str):
import unicodedata
def normalize_str(s):
return ''.join(
c for c in unicodedata.normalize('NFD', s)
if unicodedata.category(c) != 'Mn'
).lower()
agi.verbose(self.tab_id)
# Definir la ruta al archivo de condiciones basado en tab_id
conditions_file = f"/home/Flow_IA/IVR/tab_"""+ tab_id +"""_conditions.json"
first_word = input_str.split()[0] if input_str.split() else ''
try:
with open(conditions_file, 'r', encoding='utf-8') as f:
conditions_data = json.load(f)
# Iterar sobre todas las condiciones de node3
for node_cond in conditions_data.get('node3_conditions', []):
for cond in node_cond.get('conditions', []):
condition_str = cond.get('condition_string', '').strip()
extension_number = cond.get('extensionNumber', '').strip()
if first_word == normalize_str(condition_str):
agi.stream_file('resp_claude')
agi.verbose(f"Condición encontrada: {{condition_str}} -> Extensión: {{extension_number}}", 1)
try:
resultado_dial = agi.execute("EXEC Dial", f"Local/{str(extension_number)}@from-internal,20")
agi.verbose(f"Resultado del Dial: {resultado_dial}", 1)
except Exception as e:
agi.verbose(f"Error ejecutando Dial en AGI: {e}")
except Exception as e:
agi.verbose(f"Error al leer o procesar el archivo de condiciones: {{e}}", 1)
# Si no se encuentra ninguna condición que coincida
agi.verbose("No se encontró una condición que coincida con la entrada del usuario.", 1)
return None
def consultar_claude(self, text):
# Leer extensiones (este comando parece específico para tu configuración)
command = """ + '"""grep -E \'^\[|^callerid=\' /etc/asterisk/pjsip.endpoint.conf | awk \'{ \
if ($0 ~ /^\\[/) { \
gsub(/[\\[\\]]/, "", $1); \
ext = $1; \
} else if ($0 ~ /^callerid=/) { \
split($0, arr, "="); \
callerid = arr[2]; \
printf "Extension: " ext ", CallerID: " callerid "; "; \
} \
}\'"""' + """
result = subprocess.run(command, shell=True, capture_output=True, text=True)
output = result.stdout.strip()
# Crear el cliente Anthropic pasando la API key
client = anthropic.Anthropic(api_key='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')
message = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=200,
temperature=0,
system=(
""" + f"'''{content}'''" + """
),
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": text
}
]
}
]
)
respuesta_claude = "".join([msg.text for msg in message.content])
respuesta_claude = respuesta_claude.replace('%', '')
agi.verbose(f"Respuesta de Claude: {respuesta_claude}")
return respuesta_claude
if __name__ == "__main__":
agi.answer()
agi.stream_file('""" + audio_file + """')
call_handler = CallHandler()
call_handler.grabar_llamada()
"""
try:
with open(py_filepath, 'w', encoding='utf-8') as file:
file.write(file_content)
os.chmod(py_filepath, 0o755)
subprocess.run(['sudo', 'chown', 'asterisk:asterisk', py_filepath], check=True)
except Exception as e:
return JsonResponse({'status': 'error', 'message': f'Error al crear el archivo: {e}'}, status=500)
return JsonResponse({
'status': 'success',
'filePath': py_filepath
})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
# Vista para guardar datos de la pestaña
@csrf_exempt
def save_tab_data(request):
if request.method != 'POST':
return JsonResponse({'error': 'Solo se permiten solicitudes POST'}, status=405)
try:
data = json.loads(request.body)
tab_id = data.get('tabId')
tab_name = data.get('tabName') # Nombre de la pestaña
nodes = data.get('nodes')
connections = data.get('connections')
if not tab_id or nodes is None or connections is None or not tab_name:
return JsonResponse({'error': 'Datos incompletos'}, status=400)
# Sanitizar el nombre de la pestaña
safe_tab_name = get_safe_filename(tab_name)
# Formato del nombre del archivo: tab_<tabId>_<safe_tab_name>.json
filename = f'tab_{tab_id}_{safe_tab_name}.json'
file_path = os.path.join(settings.BASE_DIR, 'flow', 'data', filename)
# Filtrar los datos para cada tipo de nodo
filtered_nodes = []
for node in nodes:
node_type = node.get('nodeType')
if node_type == 'node1':
filtered_nodes.append({
'id': node.get('id'),
'title': node.get('title'),
'extensionNumber': node.get('extensionNumber'),
'position': node.get('position'),
'nodeType': node_type,
})
elif node_type == 'node2':
audio_file = node.get('audioFile', "")
if audio_file:
# Asegúrate de que audio_file sea la URL relativa
audio_file_url = audio_file # "/media/tabs/tab_<tab_id>/audio_files/<filename>.wav"
else:
audio_file_url = ""
filtered_nodes.append({
'id': node.get('id'),
'title': node.get('title'),
'audioFile': audio_file_url,
'position': node.get('position'),
'nodeType': node_type,
})
elif node_type == 'node3':
filtered_node = {
'id': node.get('id'),
'title': node.get('title'),
'content': node.get('content'),
'position': node.get('position'),
'nodeType': node_type,
'textFile': node.get('textFile', ""),
'conditions': node.get('conditions', []) # Añadido para condiciones dinámicas
}
filtered_nodes.append(filtered_node)
elif node_type == 'node4':
filtered_nodes.append({
'id': node.get('id'),
'title': node.get('title'),
'position': node.get('position'),
'nodeType': node_type,
'extensionNumber': node.get('extensionNumber', "") # Incluido extensionNumber
})
elif node_type == 'node6':
audio_file = node.get('audioFile', "")
if audio_file:
# Verificar si audio_file ya contiene la ruta relativa
if not audio_file.startswith('/media/'):
# Si solo es el nombre del archivo, añadir la ruta relativa
audio_file_url = os.path.join('/media/tabs/tab_{}/audio_files/'.format(tab_id), audio_file).replace('\\', '/')
else:
audio_file_url = audio_file # Ya tiene la ruta correcta
else:
audio_file_url = ""
filtered_nodes.append({
'id': node.get('id'),
'title': node.get('title'),
'content': node.get('content', ""),
'audioFile': audio_file_url,
'position': node.get('position'),
'nodeType': node_type,
})
else:
# Otros tipos de nodos
filtered_nodes.append(node)
# Asegurarse de que el directorio de datos existe
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
os.makedirs(data_dir, exist_ok=True)
# Guardar el archivo .json con los datos filtrados
with open(file_path, 'w', encoding='utf-8') as f:
json.dump({'tabName': tab_name, 'nodes': filtered_nodes, 'connections': connections}, f, ensure_ascii=False, indent=4)
# Extraer condiciones de node3 y crear archivo .json en /home/Flow_IA/IVR/
ivr_dir = '/home/Flow_IA/IVR/'
# Asegurarse de que el directorio exista
os.makedirs(ivr_dir, exist_ok=True)
# Preparar la estructura de datos para las condiciones
conditions_data = {
'tabId': tab_id,
'tabName': tab_name,
'node3_conditions': []
}
# Crear un diccionario de nodos para acceso rápido
node_dict = {node['id']: node for node in filtered_nodes}
# Crear un diccionario de tipos de nodos
node_types = {node['id']: node['nodeType'] for node in filtered_nodes}
for node in filtered_nodes:
if node['nodeType'] == 'node3':
node_info = {
'nodeId': node['id'],
'title': node['title'],
'conditions': []
}
# Obtener las condiciones y asociarlas con node4
for condition in node.get('conditions', []):
condition_str = condition.get('condition_string', '').strip()
target_node_id = condition.get('target_node_id', '').strip()
extension_number = ""
# Verificar si el target_node_id corresponde a un node4
if node_types.get(target_node_id) == 'node4':
node4 = node_dict.get(target_node_id)
if node4:
extension_number = node4.get('extensionNumber', "")
node_info['conditions'].append({
'condition_string': condition_str,
'extensionNumber': extension_number
})
conditions_data['node3_conditions'].append(node_info)
# Definir el nombre del archivo, por ejemplo: tab_<tab_id>_conditions.json
conditions_filename = f'tab_{tab_id}_conditions.json'
conditions_file_path = os.path.join(ivr_dir, conditions_filename)
# Guardar las condiciones en el archivo .json
with open(conditions_file_path, 'w', encoding='utf-8') as json_file:
json.dump(conditions_data, json_file, ensure_ascii=False, indent=4)
return JsonResponse({'message': 'Datos guardados exitosamente'})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
# Vista para cargar los datos de una pestaña específica
def load_tab_data(request, tab_id):
try:
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
# Buscar el archivo JSON correspondiente a la tab_id
matching_files = [f for f in os.listdir(data_dir) if f.startswith(f'tab_{tab_id}_') and f.endswith('.json')]
if not matching_files:
return JsonResponse({'nodes': [], 'connections': [], 'tabName': f'Pestaña {tab_id}'})
file_path = os.path.join(data_dir, matching_files[0])
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Asegurarse de que node1, node3 y node4 no tengan audioFile
for node in data.get('nodes', []):
node_type = node.get('nodeType')
if node_type in ['node1', 'node3', 'node4']:
node.pop('audioFile', None)
# Para node4, no hay 'receivedString' sino 'extensionNumber', asegurarse de manejar correctamente
return JsonResponse(data)
except Exception as e:
logger.error(f"Error en load_tab_data: {e}")
return JsonResponse({'error': str(e)}, status=500)
# Nueva Vista para eliminar archivos de Node3 cuando se actualiza
@csrf_exempt
def delete_files(request):
if request.method != 'POST':
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
try:
data = json.loads(request.body)
tab_id = data.get('tabId')
node_id = data.get('nodeId') # Agregamos nodeId
node_type = data.get('nodeType')
old_title = data.get('oldTitle')
if not tab_id or not node_id or not node_type or not old_title:
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
# Procesar solo si el nodo es node3
if node_type != "node3":
return JsonResponse({'status': 'error', 'message': 'Este nodo no tiene archivos para eliminar'}, status=200)
tab_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}')
text_dir = os.path.join(tab_dir, 'text_files')
os.makedirs(text_dir, exist_ok=True)
# Generar nombres de archivo basados en el antiguo título, node_type y node_id
safe_old_title = "".join([c if c.isalnum() else "_" for c in old_title])
filename_base = f"{safe_old_title}_{node_type}_{node_id}"
txt_filename = f"{filename_base}.txt"
txt_filepath = os.path.join(text_dir, txt_filename)
# Eliminar el archivo de texto si existe
if os.path.exists(txt_filepath):
os.remove(txt_filepath)
return JsonResponse({'status': 'success', 'message': 'Archivo de texto eliminado correctamente'})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
# Vista para subir audio para Node2
@csrf_exempt
def upload_audio(request):
if request.method != 'POST':
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
try:
tab_id = request.POST.get('tabId')
node_id = request.POST.get('nodeId') # Agregamos nodeId
node_type = request.POST.get('nodeType')
title = request.POST.get('title')
audio_file = request.FILES.get('audioFile')
if not tab_id or not node_id or not node_type or not title or not audio_file:
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
# Procesar solo si el nodo es node2
if node_type != "node2":
return JsonResponse({'status': 'error', 'message': 'Este endpoint solo es para node2'}, status=400)
# Directorios específicos para la pestaña
tab_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}')
audio_dir = os.path.join(tab_dir, 'audio_files')
os.makedirs(audio_dir, exist_ok=True)
# Generar nombre único para el archivo de audio usando node_id
safe_title = "".join([c if c.isalnum() else "_" for c in title])
filename_base = f"{safe_title}_{node_type}_{node_id}"
wav_filename = f"{filename_base}.wav"
wav_filepath = os.path.join(audio_dir, wav_filename)
# Guardar el archivo de audio en /media/tabs
with open(wav_filepath, 'wb+') as destination:
for chunk in audio_file.chunks():
destination.write(chunk)
# Copiar el archivo a /var/spool/asterisk/recordings
asterisk_dir = '/var/lib/asterisk/sounds'
os.makedirs(asterisk_dir, exist_ok=True) # Asegurarse de que el directorio exista
asterisk_filepath = os.path.join(asterisk_dir, wav_filename)
shutil.copy(wav_filepath, asterisk_filepath)
# Generar URL del archivo
audio_file_url = os.path.join(settings.MEDIA_URL, 'tabs', f'tab_{tab_id}', 'audio_files', wav_filename)
return JsonResponse({
'status': 'success',
'audioFile': audio_file_url,
'asteriskFile': asterisk_filepath # Confirmación de la ruta en Asterisk
})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
# Vista para eliminar audio de Node2 y Node6
@csrf_exempt
def delete_audio(request):
if request.method != 'POST':
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
try:
data = json.loads(request.body)
tab_id = data.get('tabId')
node_id = data.get('nodeId') # Agregamos nodeId
node_type = data.get('nodeType')
old_audio_file = data.get('oldAudioFile')
if not tab_id or not node_id or not node_type or not old_audio_file:
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
# Procesar solo si el nodo es node2 o node6
if node_type not in ["node2", "node6"]:
return JsonResponse({'status': 'error', 'message': 'Este endpoint solo es para node2 o node6'}, status=400)
# Obtener la ruta completa del archivo de audio
audio_file_path = os.path.join(settings.BASE_DIR, old_audio_file.replace(settings.MEDIA_URL, ''))
if os.path.exists(audio_file_path):
os.remove(audio_file_path)
return JsonResponse({'status': 'success', 'message': 'Archivo de audio eliminado correctamente'})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
# Vista para renombrar audio de Node2 y Node6
@csrf_exempt
def rename_audio(request):
"""
Renombra el archivo de audio asociado a node2 o node6.
"""
if request.method != 'POST':
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
try:
data = json.loads(request.body)
tab_id = data.get('tabId')
node_id = data.get('nodeId')
node_type = data.get('nodeType')
old_title = data.get('oldTitle')
new_title = data.get('newTitle')
current_audio_file = data.get('currentAudioFile')
if not tab_id or not node_id or not node_type or not old_title or not new_title or not current_audio_file:
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
# Procesar solo si el nodo es node2 o node6
if node_type not in ["node2", "node6"]:
return JsonResponse({'status': 'error', 'message': 'Este endpoint solo es para node2 o node6'}, status=400)
# Directorios específicos para la pestaña
audio_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}', 'audio_files')
os.makedirs(audio_dir, exist_ok=True)
# Generar nombres de archivo basados en los títulos antiguos y nuevos y node_id
safe_old_title = get_safe_filename(old_title)
safe_new_title = get_safe_filename(new_title)
old_filename_base = f"{node_id}_{safe_old_title}"
new_filename_base = f"{node_id}_{safe_new_title}"
old_wav_filename = f"{old_filename_base}.wav"
new_wav_filename = f"{new_filename_base}.wav"
old_wav_filepath = os.path.join(audio_dir, old_wav_filename)
new_wav_filepath = os.path.join(audio_dir, new_wav_filename)
# Renombrar el archivo de audio
if os.path.exists(old_wav_filepath):
os.rename(old_wav_filepath, new_wav_filepath)
# Copiar el archivo renombrado a Asterisk
asterisk_dir = '/var/lib/asterisk/sounds'
asterisk_filepath = os.path.join(asterisk_dir, new_wav_filename)
shutil.copy(new_wav_filepath, asterisk_filepath)
# Generar URL del nuevo archivo
new_audio_file_url = os.path.join(settings.MEDIA_URL, 'tabs', f'tab_{tab_id}', 'audio_files', new_wav_filename)
new_audio_file_url = new_audio_file_url.replace('\\', '/') # Asegurar la barra correcta en Windows
return JsonResponse({
'status': 'success',
'newAudioFile': new_audio_file_url # Ruta relativa
})
else:
return JsonResponse({'status': 'error', 'message': 'Archivo de audio original no encontrado'}, status=404)
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
# Vista para renombrar una pestaña
@csrf_exempt
def rename_tab(request):
"""
Renombra una pestaña específica.
"""
if request.method != 'POST':
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
try:
data = json.loads(request.body)
tab_id = data.get('tabId')
new_tab_name = data.get('newTabName')
if not tab_id or not new_tab_name:
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
safe_new_tab_name = get_safe_filename(new_tab_name)
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
# Buscar el archivo JSON correspondiente a la tab_id
matching_files = [f for f in os.listdir(data_dir) if f.startswith(f'tab_{tab_id}_') and f.endswith('.json')]
if not matching_files:
return JsonResponse({'status': 'error', 'message': 'Archivo de pestaña no encontrado'}, status=404)
old_filename = matching_files[0]
old_file_path = os.path.join(data_dir, old_filename)
# Nuevo nombre de archivo
new_filename = f'tab_{tab_id}_{safe_new_tab_name}.json'
new_file_path = os.path.join(data_dir, new_filename)
# Renombrar el archivo
os.rename(old_file_path, new_file_path)
# Actualizar el tabName dentro del JSON
with open(new_file_path, 'r', encoding='utf-8') as f:
tab_data = json.load(f)
tab_data['tabName'] = new_tab_name
with open(new_file_path, 'w', encoding='utf-8') as f:
json.dump(tab_data, f, ensure_ascii=False, indent=4)
return JsonResponse({'status': 'success', 'message': 'Pestaña renombrada exitosamente'})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
# Vista para actualizar Node1 (título y extensión)
@csrf_exempt
def update_node1(request):
"""
Actualiza el título y número de extensión de un node1 específico.
"""
if request.method != 'POST':
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
try:
data = json.loads(request.body)
tab_id = data.get('tabId')
tab_name = data.get('tabName')
node_id = data.get('nodeId')
node_type = data.get('nodeType')
new_title = data.get('newTitle')
new_extension = data.get('newExtension')
old_title = data.get('oldTitle')
# Directorio de configuración de Asterisk para extensiones personalizadas
directory = '/etc/asterisk/extensions_custom.conf'
file_content = f"""
exten => {new_extension},1,Answer()
exten => {new_extension},n,AGI(/var/lib/asterisk/agi-bin/{tab_name}.py)
exten => {new_extension},n,Hangup()
"""
try:
shutil.copyfile(directory, f"{directory}.bak") # Crear una copia de seguridad
with open(directory, 'a') as conf_file:
conf_file.write(file_content)
except Exception as e:
logger.error(f"No fue posible crear o escribir en el archivo {directory}: {e}")
if not tab_id or not node_id or not node_type or not new_title or not new_extension or not old_title:
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
# Procesar solo si el nodo es node1
if node_type != "node1":
return JsonResponse({'status': 'error', 'message': 'Este endpoint solo es para node1'}, status=400)
# Directorios específicos para la pestaña
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
# Buscar el archivo JSON correspondiente a la pestaña
matching_files = [f for f in os.listdir(data_dir) if re.match(rf'tab_{tab_id}_.+\.json', f)]
if not matching_files:
return JsonResponse({'status': 'error', 'message': 'Archivo de pestaña no encontrado'}, status=404)
file_path = os.path.join(data_dir, matching_files[0])
with open(file_path, 'r', encoding='utf-8') as f:
tab_data = json.load(f)
# Buscar el nodo correspondiente y actualizar sus datos
for node in tab_data.get('nodes', []):
if node['id'] == node_id and node['nodeType'] == 'node1':
node['title'] = new_title
node['extensionNumber'] = new_extension
break
# Guardar el archivo .json actualizado
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(tab_data, f, ensure_ascii=False, indent=4)
return JsonResponse({
'status': 'success',
'message': 'Nodo1 actualizado exitosamente'
})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
# Vista para eliminar un nodo
@csrf_exempt
def delete_node(request):
if request.method != 'POST':
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
try:
data = json.loads(request.body)
tab_id = data.get('tabId')
node_id = data.get('nodeId')
node_type = data.get('nodeType')
if not tab_id or not node_id or not node_type:
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
# Cargar el archivo de datos de la pestaña
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
# Buscar el archivo JSON correspondiente a la pestaña
matching_files = [f for f in os.listdir(data_dir) if f.startswith(f'tab_{tab_id}_') and f.endswith('.json')]
if not matching_files:
return JsonResponse({'status': 'error', 'message': 'Archivo de pestaña no encontrado'}, status=404)
file_path = os.path.join(data_dir, matching_files[0])
with open(file_path, 'r', encoding='utf-8') as f:
tab_data = json.load(f)
# Obtener los datos del nodo antes de eliminarlo
node_data = next((n for n in tab_data.get('nodes', []) if n['id'] == node_id), None)
if not node_data:
return JsonResponse({'status': 'error', 'message': 'Nodo no encontrado en la pestaña'}, status=404)
node_title = node_data.get('title', '')
# Eliminar el nodo y sus conexiones
tab_data['nodes'] = [node for node in tab_data.get('nodes', []) if node['id'] != node_id]
tab_data['connections'] = [conn for conn in tab_data.get('connections', [])
if conn['startNodeId'] != node_id and conn['endNodeId'] != node_id]
# Si es un nodo3, nodo2 o nodo6, manejar eliminación de archivos
if node_type == 'node3':
# Lógica para eliminar archivos de texto
text_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}', 'text_files')
safe_title = "".join([c if c.isalnum() else "_" for c in node_title])
txt_filename = f"{safe_title}_node3_{node_id}.txt"
txt_filepath = os.path.join(text_dir, txt_filename)
if os.path.exists(txt_filepath):
os.remove(txt_filepath)
elif node_type in ['node2', 'node6']:
# Lógica para eliminar archivos de audio
audio_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}', 'audio_files')
safe_title = "".join([c if c.isalnum() else "_" for c in node_title])
wav_filename = f"{node_id}_{safe_title}.WAV"
wav_filepath = os.path.join(audio_dir, wav_filename)
if os.path.exists(wav_filepath):
os.remove(wav_filepath)
# Guardar los datos actualizados
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(tab_data, f, ensure_ascii=False, indent=4)
return JsonResponse({'status': 'success', 'message': 'Nodo y sus conexiones eliminados exitosamente'})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
# Vista para eliminar una pestaña
@csrf_exempt
def delete_tab(request):
if request.method != 'POST':
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
try:
data = json.loads(request.body)
tab_id = data.get('tabId')
tab_name = data.get('tabName')
if not tab_id or not tab_name:
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
# Eliminar el archivo de datos de la pestaña
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
# Buscar el archivo JSON correspondiente a la pestaña
matching_files = [f for f in os.listdir(data_dir) if f.startswith(f'tab_{tab_id}_') and f.endswith('.json')]
if not matching_files:
return JsonResponse({'status': 'error', 'message': 'Archivo de pestaña no encontrado'}, status=404)
file_path = os.path.join(data_dir, matching_files[0])
os.remove(file_path)
# Eliminar los archivos de medios asociados (audio_files y text_files)
media_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}')
if os.path.exists(media_dir):
shutil.rmtree(media_dir)
# Eliminar el archivo de condiciones JSON
ivr_dir = '/home/Flow_IA/IVR/'
conditions_filename = f'tab_{tab_id}_conditions.json'
conditions_file_path = os.path.join(ivr_dir, conditions_filename)
if os.path.exists(conditions_file_path):
os.remove(conditions_file_path)
return JsonResponse({'status': 'success', 'message': 'Pestaña eliminada exitosamente'})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
# Vista para actualizar Node4
@csrf_exempt
def update_node4(request):
if request.method != 'POST':
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
try:
data = json.loads(request.body)
tab_id = data.get('tabId')
tab_name = data.get('tabName')
node_id = data.get('nodeId')
node_type = data.get('nodeType')
extension_number = data.get('extensionNumber') # Renombrado de receivedString a extensionNumber
if not tab_id or not node_id or not node_type or extension_number is None:
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
if node_type != "node4":
return JsonResponse({'status': 'error', 'message': 'Este endpoint solo es para node4'}, status=400)
# Guardar el número de extensión en el archivo de la pestaña
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
# Buscar el archivo JSON correspondiente a la pestaña
matching_files = [f for f in os.listdir(data_dir) if re.match(rf'tab_{tab_id}_.+\.json', f)]
if not matching_files:
return JsonResponse({'status': 'error', 'message': 'Archivo de pestaña no encontrado'}, status=404)
file_path = os.path.join(data_dir, matching_files[0])
with open(file_path, 'r', encoding='utf-8') as f:
tab_data = json.load(f)
# Buscar el node4 correspondiente y actualizar su extensionNumber
node_found = False
for node in tab_data.get('nodes', []):
if node['id'] == node_id and node['nodeType'] == 'node4':
node['extensionNumber'] = extension_number
node_found = True
break
if not node_found:
return JsonResponse({'status': 'error', 'message': 'node4 no encontrado'}, status=404)
# Guardar el archivo .json actualizado
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(tab_data, f, ensure_ascii=False, indent=4)
return JsonResponse({'status': 'success', 'message': 'node4 actualizado exitosamente'})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
# Vista para generar audios por medio del TTS
@csrf_exempt
def tts_module(request, language="es"):
"""
Genera un archivo de audio a partir de un texto utilizando TTS y lo guarda en el directorio correspondiente.
"""
if request.method != 'POST':
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
try:
data = json.loads(request.body)
tab_id = data.get('tabId')
title = data.get('title')
text = data.get("text")
node_id = data.get("nodeId")
node_type = data.get("nodeType")
if not tab_id or not title or not text or not node_id or not node_type:
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
# Directorios específicos para la pestaña
audio_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}', 'audio_files')
os.makedirs(audio_dir, exist_ok=True)
# Generar nombre único para el archivo de audio usando node_id y node_title
wav_filename = f"{title}_{node_type}_{node_id}.wav" # Estructura de nombre {node_id}_{node_title}.wav
wav_filepath = os.path.join(audio_dir, wav_filename)
# Convertir texto a voz con el idioma seleccionado
tts_obj = gTTS(text, lang=language, slow=False)
speech_file_path = Path("speech.mp3")
tts_obj.save(speech_file_path)
# Ajustar la velocidad del audio
speed = 1.1 # Velocidad fija
audio = AudioSegment.from_file(speech_file_path)
faster_audio = audio._spawn(audio.raw_data, overrides={
"frame_rate": int(audio.frame_rate * speed)
}).set_frame_rate(audio.frame_rate)
# Convertir a formato WAV a 8000 kHz
faster_audio = faster_audio.set_frame_rate(8000)
faster_audio.export(wav_filepath, format="wav")
os.remove(speech_file_path)
asterisk_dir = '/var/lib/asterisk/sounds'
os.makedirs(asterisk_dir, exist_ok=True)
asterisk_filepath = os.path.join(asterisk_dir, wav_filename)
shutil.copy(wav_filepath, asterisk_filepath)
# Generar URL del archivo
audio_file_url = os.path.join(settings.MEDIA_URL, 'tabs', f'tab_{tab_id}', 'audio_files', wav_filename)
return JsonResponse({'status': 'success', 'audioFile': audio_file_url, 'message': 'Audio saved successfully'})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
def apply_migrations_to_dynamic_db():
try:
call_command('migrate', database='dynamic')
print("Migraciones aplicadas con éxito a la base de datos dinámica.")
except Exception as e:
raise Exception(f"Error al aplicar migraciones: {str(e)}")
DB_ENGINES = {
'mysql': 'django.db.backends.mysql',
'mssql': 'sql_server.pyodbc',
'postgresql': 'django.db.backends.postgresql',
'oracle': 'django.db.backends.oracle',
}
def configure_dynamic_connection(db_config, db_choice):
alias = 'dynamic'
# Se verifica si existe una conexión y en caso de haberla, esta se cierra
if alias in connections.databases:
connections[alias].close() # Cierra la conexión activa
del connections.databases[alias] # Elimina la configuración de la conexión
# Configurar opciones específicas del motor
if db_choice == 'mssql':
db_config['OPTIONS'] = {'driver': 'ODBC Driver 17 for SQL Server'}
elif db_choice == 'mysql':
db_config['OPTIONS'] = {'init_command': "SET sql_mode='STRICT_TRANS_TABLES'"}
# Registrar la configuración en el alias 'dynamic'
connections.databases[alias] = {
'ENGINE': db_config['ENGINE'],
'NAME': db_config['NAME'],
'USER': db_config['USER'],
'PASSWORD': db_config['PASSWORD'],
'HOST': db_config['HOST'],
'PORT': db_config['PORT'],
'OPTIONS': db_config.get('OPTIONS', {}),
'ATOMIC_REQUESTS': db_config.get('ATOMIC_REQUESTS', False),
'CONN_HEALTH_CHECKS' : db_config.get('CONN_HEALTH_CHECKS'),
'CONN_MAX_AGE': db_config.get('CONN_MAX_AGE', 0),
'TIME_ZONE': db_config.get('TIME_ZONE', settings.TIME_ZONE),
'AUTOCOMMIT': db_config.get('AUTOCOMMIT', True),
}
# Probar la conexión
try:
with connections[alias].cursor() as cursor:
cursor.execute("SELECT 1")
except OperationalError as e:
raise OperationalError(f"Error al conectar a la BD dinámica: {str(e)}")
@csrf_exempt
def bd_connection(request):
if request.method == 'POST':
try:
data = json.loads(request.body)
# Validar los datos del formulario
required_fields = ['bd', 'name_bd', 'user_bd', 'password_bd', 'host_bd', 'port_bd']
if not all(field in data for field in required_fields):
return JsonResponse({'status': 'error', 'message': 'Faltan campos requeridos'}, status=400)
# Cerrar conexiones anteriores
if 'dynamic' in connections.databases:
connections['dynamic'].close()
del connections.databases['dynamic']
# Determinar el motor
db_choice = data['bd']
engine = DB_ENGINES.get(db_choice)
if not engine:
return JsonResponse({'status': 'error', 'message': f'Motor no soportado: {db_choice}'}, status=400)
# Configurar conexión dinámica
db_config = {
'ENGINE': engine,
'NAME': data['name_bd'],
'USER': data['user_bd'],
'PASSWORD': data['password_bd'],
'HOST': data['host_bd'],
'PORT': data['port_bd'],
'ATOMIC_REQUESTS': False,
'TIME_ZONE': settings.TIME_ZONE,
'CONN_HEALTH_CHECKS': False,
'CONN_MAX_AGE': None,
'OPTIONS': {},
'AUTOCOMMIT': True,
}
# Configurar y aplicar migraciones
configure_dynamic_connection(db_config, db_choice)
#apply_migrations_to_dynamic_db()
#configure_session_backend('dynamic')
request.session['dynamic_db_config'] = db_config
request.session['db_choice'] = db_choice
return JsonResponse({'status': 'success', 'message': 'Conexión exitosa'})
except OperationalError as e:
return JsonResponse({'status': 'error', 'message': str(e)}, status=400)
except Exception as e:
return JsonResponse({'status': 'error', 'message': 'Error inesperado', 'details': str(e)}, status=500)
return JsonResponse({'error': 'Método no permitido'}, status=405)
def convert_bytes_to_str(value):
"""Convierte valores bytes a string o hexadecimal."""
if isinstance(value, bytes):
try:
return value.decode('utf-8') # Intenta decodificar como texto
except UnicodeDecodeError:
return value.hex() # Si falla, convierte a hexadecimal
return value
@csrf_exempt
def bd_query(request):
if request.method == 'POST':
dynamic_config = request.session.get('dynamic_db_config')
db_choice = request.session.get('db_choice')
if not dynamic_config:
return JsonResponse({'status': 'error', 'message': 'No hay conexión configurada.'}, status=400)
try:
# Restaurar conexión si es necesario
configure_dynamic_connection(dynamic_config, db_choice)
data = json.loads(request.body)
query = data.get('query')
if not query:
return JsonResponse({'status': 'error', 'message': 'No se proporcionó una consulta.'}, status=400)
# Ejecutar consulta en la conexión dinámica
with connections['dynamic'].cursor() as cursor:
cursor.execute(query)
columns = [col[0] for col in cursor.description] # Obtener nombres de columnas
result = cursor.fetchall()
# Procesar los resultados para manejar valores bytes
processed_result = [
{columns[i]: convert_bytes_to_str(row[i]) for i in range(len(columns))}
for row in result
]
return JsonResponse({'status': 'success', 'data': processed_result, 'columns': columns})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e), 'details': db_choice}, status=400)
return JsonResponse({'error': 'Método no permitido'}, status=405)