You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1147 lines
47 KiB
1147 lines
47 KiB
# views.py
|
|
|
|
from django.shortcuts import render
|
|
import os
|
|
from django.http import JsonResponse
|
|
from django.conf import settings
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
import json
|
|
from pathlib import Path
|
|
import shutil
|
|
import re
|
|
import subprocess
|
|
from django.db import connections
|
|
import django
|
|
from django.db.utils import OperationalError
|
|
from django.core.management import call_command
|
|
|
|
from gtts import gTTS
|
|
from pydub import AudioSegment
|
|
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Dialbox_8.settings')
|
|
django.setup()
|
|
|
|
# Helper function to make safe filenames
|
|
def get_safe_filename(name):
|
|
return re.sub(r'[^a-zA-Z0-9_-]', '_', name).strip()
|
|
|
|
# Vista para renderizar la página principal
|
|
def create_tabs_flow(request):
|
|
return render(request, 'flow.html')
|
|
|
|
# Vista para listar todas las pestañas existentes
|
|
def list_tabs(request):
|
|
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
|
|
tabs = []
|
|
|
|
if os.path.exists(data_dir):
|
|
for filename in os.listdir(data_dir):
|
|
if filename.startswith('tab_') and filename.endswith('.json'):
|
|
# Extraer tabId y tabName del nombre del archivo
|
|
match = re.match(r'tab_(\d+)_(.+)\.json', filename)
|
|
if match:
|
|
tab_id = match.group(1)
|
|
file_path = os.path.join(data_dir, filename)
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
tab_name = data.get('tabName', f'Pestaña {tab_id}')
|
|
tabs.append({
|
|
'name': tab_name,
|
|
'id': tab_id
|
|
})
|
|
|
|
return JsonResponse({'tabs': tabs})
|
|
|
|
# Función para crear y guardar archivos de texto desde el contenido del node3
|
|
@csrf_exempt
|
|
def save_tts(request):
|
|
if request.method != 'POST':
|
|
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
|
|
|
|
try:
|
|
data = json.loads(request.body)
|
|
tab_id = data.get('tabId')
|
|
tab_name = data.get('tabName')
|
|
node_id = data.get('nodeId') # Agregamos nodeId
|
|
node_type = data.get('nodeType')
|
|
title = data.get('title')
|
|
content = data.get('content')
|
|
audio_file = data.get('audioFile') # Obtenemos el nombre del archivo de audio
|
|
|
|
if not audio_file:
|
|
audio_file = 'saludo_claude' # Valor por defecto si no se proporciona
|
|
|
|
# Formatear el nombre del archivo como {node_id}_{node_title}.WAV
|
|
safe_title = get_safe_filename(title)
|
|
formatted_audio_file = f"{node_id}_{safe_title}.WAV"
|
|
|
|
file_name = f'{tab_name}.py'
|
|
|
|
py_filepath = os.path.join('/var/lib/asterisk/agi-bin/', file_name)
|
|
|
|
# Escapar llaves en el contenido
|
|
file_content = f"""#!/usr/bin/env python3
|
|
import speech_recognition as sr
|
|
import re
|
|
import tts
|
|
import subprocess
|
|
import os
|
|
from asterisk.agi import AGI
|
|
import anthropic
|
|
import json
|
|
|
|
agi = AGI()
|
|
|
|
class CallHandler:
|
|
def __init__(self):
|
|
self.audio_path = "/var/lib/asterisk/sounds/grabacion_usuario.wav"
|
|
self.tab_id = self.get_tab_id()
|
|
|
|
def get_tab_id(self):
|
|
# Obtener el tab_id desde una variable de entorno AGI
|
|
# Asegúrate de pasar esta variable desde el Dialplan
|
|
tab_id = agi.env.get('TAB_ID', None)
|
|
if not tab_id:
|
|
agi.verbose("TAB_ID no fue proporcionado. Utilizando 'default_tab_id'.", 1)
|
|
return 'default_tab_id'
|
|
return tab_id
|
|
|
|
def grabar_llamada(self):
|
|
callerid = agi.env.get('agi_callerid', '')
|
|
uniqueid = agi.env.get('agi_uniqueid', '')
|
|
|
|
# Registrar la información de la llamada
|
|
agi.verbose(f"CallerID: {{callerid}}", 1)
|
|
agi.verbose(f"UniqueID: {{uniqueid}}", 1)
|
|
|
|
# Iniciar la grabación
|
|
agi.verbose("Iniciando grabación...", 1)
|
|
try:
|
|
result = agi.record_file(
|
|
filename='/var/lib/asterisk/sounds/grabacion_usuario',
|
|
format='wav',
|
|
timeout=4000, # Puedes ajustar el tiempo de espera según tus necesidades
|
|
offset=0,
|
|
beep=True,
|
|
silence=0,
|
|
escape_digits='#'
|
|
)
|
|
except Exception as e:
|
|
agi.verbose(f"Error al grabar: {{e}}", 1)
|
|
else:
|
|
agi.verbose(f"Grabación completada. Resultado: {{result}}", 1)
|
|
self.detectar_audio()
|
|
|
|
def detectar_audio(self):
|
|
recognizer = sr.Recognizer()
|
|
|
|
with sr.AudioFile(self.audio_path) as source:
|
|
audio_data = recognizer.record(source)
|
|
try:
|
|
text = recognizer.recognize_google(audio_data, language="es-ES")
|
|
agi.verbose(f"La pregunta es: {{text}}", 1)
|
|
|
|
# Llamar a la función consultar_claude con el texto reconocido
|
|
respuesta = self.consultar_claude(text)
|
|
|
|
cleaned_text = re.sub(r'^(?:\\*+|\\d+)', '', respuesta).strip()
|
|
cleaned_text = cleaned_text.replace('%', '')
|
|
tts.convert_text_to_speech(cleaned_text)
|
|
|
|
ext = self.determine_extension(respuesta)
|
|
|
|
# Reproducir la respuesta generada por Claude
|
|
try:
|
|
tts.convert_text_to_speech(cleaned_text)
|
|
agi.stream_file('resp_claude')
|
|
except Exception as e:
|
|
agi.verbose(f"Error al convertir texto a voz: {{e}}", 1)
|
|
|
|
if ext is not None and ext != '':
|
|
try:
|
|
resultado_dial = agi.execute("EXEC Dial", f"Local/{{str(ext)}}@from-internal,20")
|
|
agi.verbose(f"Resultado del Dial: {{resultado_dial}}", 1)
|
|
except Exception as e:
|
|
agi.verbose(f"Error ejecutando Dial en AGI: {{e}}", 1)
|
|
else:
|
|
agi.verbose("La variable ext no tiene un valor válido.", 1)
|
|
agi.stream_file('algomas')
|
|
# Volver a grabar si es necesario
|
|
self.grabar_llamada()
|
|
except sr.RequestError as e:
|
|
agi.verbose(f"Error al conectarse con el servicio de reconocimiento de voz: {{e}}", 1)
|
|
except sr.UnknownValueError:
|
|
agi.verbose("No se pudo entender el audio", 1)
|
|
agi.stream_file('no_entendi')
|
|
self.grabar_llamada()
|
|
|
|
def determine_extension(self, input_str):
|
|
import unicodedata
|
|
|
|
|
|
def normalize_str(s):
|
|
return ''.join(
|
|
c for c in unicodedata.normalize('NFD', s)
|
|
if unicodedata.category(c) != 'Mn'
|
|
).lower()
|
|
|
|
agi.verbose(self.tab_id)
|
|
|
|
# Definir la ruta al archivo de condiciones basado en tab_id
|
|
conditions_file = f"/home/Flow_IA/IVR/tab_"""+ tab_id +"""_conditions.json"
|
|
|
|
first_word = input_str.split()[0] if input_str.split() else ''
|
|
|
|
try:
|
|
with open(conditions_file, 'r', encoding='utf-8') as f:
|
|
conditions_data = json.load(f)
|
|
|
|
# Iterar sobre todas las condiciones de node3
|
|
for node_cond in conditions_data.get('node3_conditions', []):
|
|
for cond in node_cond.get('conditions', []):
|
|
condition_str = cond.get('condition_string', '').strip()
|
|
extension_number = cond.get('extensionNumber', '').strip()
|
|
if first_word == normalize_str(condition_str):
|
|
agi.stream_file('resp_claude')
|
|
agi.verbose(f"Condición encontrada: {{condition_str}} -> Extensión: {{extension_number}}", 1)
|
|
try:
|
|
resultado_dial = agi.execute("EXEC Dial", f"Local/{str(extension_number)}@from-internal,20")
|
|
agi.verbose(f"Resultado del Dial: {resultado_dial}", 1)
|
|
except Exception as e:
|
|
agi.verbose(f"Error ejecutando Dial en AGI: {e}")
|
|
|
|
except Exception as e:
|
|
agi.verbose(f"Error al leer o procesar el archivo de condiciones: {{e}}", 1)
|
|
|
|
# Si no se encuentra ninguna condición que coincida
|
|
agi.verbose("No se encontró una condición que coincida con la entrada del usuario.", 1)
|
|
return None
|
|
|
|
def consultar_claude(self, text):
|
|
# Leer extensiones (este comando parece específico para tu configuración)
|
|
command = """ + '"""grep -E \'^\[|^callerid=\' /etc/asterisk/pjsip.endpoint.conf | awk \'{ \
|
|
if ($0 ~ /^\\[/) { \
|
|
gsub(/[\\[\\]]/, "", $1); \
|
|
ext = $1; \
|
|
} else if ($0 ~ /^callerid=/) { \
|
|
split($0, arr, "="); \
|
|
callerid = arr[2]; \
|
|
printf "Extension: " ext ", CallerID: " callerid "; "; \
|
|
} \
|
|
}\'"""' + """
|
|
|
|
result = subprocess.run(command, shell=True, capture_output=True, text=True)
|
|
output = result.stdout.strip()
|
|
|
|
# Crear el cliente Anthropic pasando la API key
|
|
client = anthropic.Anthropic(api_key='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')
|
|
|
|
message = client.messages.create(
|
|
model="claude-3-5-haiku-20241022",
|
|
max_tokens=200,
|
|
temperature=0,
|
|
system=(
|
|
""" + f"'''{content}'''" + """
|
|
),
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": text
|
|
}
|
|
]
|
|
}
|
|
]
|
|
)
|
|
|
|
respuesta_claude = "".join([msg.text for msg in message.content])
|
|
respuesta_claude = respuesta_claude.replace('%', '')
|
|
agi.verbose(f"Respuesta de Claude: {respuesta_claude}")
|
|
return respuesta_claude
|
|
|
|
if __name__ == "__main__":
|
|
agi.answer()
|
|
agi.stream_file('""" + audio_file + """')
|
|
call_handler = CallHandler()
|
|
call_handler.grabar_llamada()
|
|
"""
|
|
|
|
try:
|
|
with open(py_filepath, 'w', encoding='utf-8') as file:
|
|
file.write(file_content)
|
|
|
|
os.chmod(py_filepath, 0o755)
|
|
|
|
subprocess.run(['sudo', 'chown', 'asterisk:asterisk', py_filepath], check=True)
|
|
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': f'Error al crear el archivo: {e}'}, status=500)
|
|
|
|
return JsonResponse({
|
|
'status': 'success',
|
|
'filePath': py_filepath
|
|
})
|
|
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
|
|
|
|
# Vista para guardar datos de la pestaña
|
|
@csrf_exempt
|
|
def save_tab_data(request):
|
|
if request.method != 'POST':
|
|
return JsonResponse({'error': 'Solo se permiten solicitudes POST'}, status=405)
|
|
|
|
try:
|
|
data = json.loads(request.body)
|
|
tab_id = data.get('tabId')
|
|
tab_name = data.get('tabName') # Nombre de la pestaña
|
|
nodes = data.get('nodes')
|
|
connections = data.get('connections')
|
|
|
|
if not tab_id or nodes is None or connections is None or not tab_name:
|
|
return JsonResponse({'error': 'Datos incompletos'}, status=400)
|
|
|
|
# Sanitizar el nombre de la pestaña
|
|
safe_tab_name = get_safe_filename(tab_name)
|
|
|
|
# Formato del nombre del archivo: tab_<tabId>_<safe_tab_name>.json
|
|
filename = f'tab_{tab_id}_{safe_tab_name}.json'
|
|
file_path = os.path.join(settings.BASE_DIR, 'flow', 'data', filename)
|
|
|
|
# Filtrar los datos para cada tipo de nodo
|
|
filtered_nodes = []
|
|
for node in nodes:
|
|
node_type = node.get('nodeType')
|
|
if node_type == 'node1':
|
|
filtered_nodes.append({
|
|
'id': node.get('id'),
|
|
'title': node.get('title'),
|
|
'extensionNumber': node.get('extensionNumber'),
|
|
'position': node.get('position'),
|
|
'nodeType': node_type,
|
|
})
|
|
elif node_type == 'node2':
|
|
audio_file = node.get('audioFile', "")
|
|
if audio_file:
|
|
# Asegúrate de que audio_file sea la URL relativa
|
|
audio_file_url = audio_file # "/media/tabs/tab_<tab_id>/audio_files/<filename>.wav"
|
|
else:
|
|
audio_file_url = ""
|
|
filtered_nodes.append({
|
|
'id': node.get('id'),
|
|
'title': node.get('title'),
|
|
'audioFile': audio_file_url,
|
|
'position': node.get('position'),
|
|
'nodeType': node_type,
|
|
})
|
|
elif node_type == 'node3':
|
|
filtered_node = {
|
|
'id': node.get('id'),
|
|
'title': node.get('title'),
|
|
'content': node.get('content'),
|
|
'position': node.get('position'),
|
|
'nodeType': node_type,
|
|
'textFile': node.get('textFile', ""),
|
|
'conditions': node.get('conditions', []) # Añadido para condiciones dinámicas
|
|
}
|
|
filtered_nodes.append(filtered_node)
|
|
elif node_type == 'node4':
|
|
filtered_nodes.append({
|
|
'id': node.get('id'),
|
|
'title': node.get('title'),
|
|
'position': node.get('position'),
|
|
'nodeType': node_type,
|
|
'extensionNumber': node.get('extensionNumber', "") # Incluido extensionNumber
|
|
})
|
|
elif node_type == 'node6':
|
|
audio_file = node.get('audioFile', "")
|
|
if audio_file:
|
|
# Verificar si audio_file ya contiene la ruta relativa
|
|
if not audio_file.startswith('/media/'):
|
|
# Si solo es el nombre del archivo, añadir la ruta relativa
|
|
audio_file_url = os.path.join('/media/tabs/tab_{}/audio_files/'.format(tab_id), audio_file).replace('\\', '/')
|
|
else:
|
|
audio_file_url = audio_file # Ya tiene la ruta correcta
|
|
else:
|
|
audio_file_url = ""
|
|
filtered_nodes.append({
|
|
'id': node.get('id'),
|
|
'title': node.get('title'),
|
|
'content': node.get('content', ""),
|
|
'audioFile': audio_file_url,
|
|
'position': node.get('position'),
|
|
'nodeType': node_type,
|
|
})
|
|
else:
|
|
# Otros tipos de nodos
|
|
filtered_nodes.append(node)
|
|
|
|
# Asegurarse de que el directorio de datos existe
|
|
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
|
|
os.makedirs(data_dir, exist_ok=True)
|
|
|
|
# Guardar el archivo .json con los datos filtrados
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump({'tabName': tab_name, 'nodes': filtered_nodes, 'connections': connections}, f, ensure_ascii=False, indent=4)
|
|
|
|
# Extraer condiciones de node3 y crear archivo .json en /home/Flow_IA/IVR/
|
|
ivr_dir = '/home/Flow_IA/IVR/'
|
|
|
|
# Asegurarse de que el directorio exista
|
|
os.makedirs(ivr_dir, exist_ok=True)
|
|
|
|
# Preparar la estructura de datos para las condiciones
|
|
conditions_data = {
|
|
'tabId': tab_id,
|
|
'tabName': tab_name,
|
|
'node3_conditions': []
|
|
}
|
|
|
|
# Crear un diccionario de nodos para acceso rápido
|
|
node_dict = {node['id']: node for node in filtered_nodes}
|
|
# Crear un diccionario de tipos de nodos
|
|
node_types = {node['id']: node['nodeType'] for node in filtered_nodes}
|
|
|
|
for node in filtered_nodes:
|
|
if node['nodeType'] == 'node3':
|
|
node_info = {
|
|
'nodeId': node['id'],
|
|
'title': node['title'],
|
|
'conditions': []
|
|
}
|
|
|
|
# Obtener las condiciones y asociarlas con node4
|
|
for condition in node.get('conditions', []):
|
|
condition_str = condition.get('condition_string', '').strip()
|
|
target_node_id = condition.get('target_node_id', '').strip()
|
|
extension_number = ""
|
|
|
|
# Verificar si el target_node_id corresponde a un node4
|
|
if node_types.get(target_node_id) == 'node4':
|
|
node4 = node_dict.get(target_node_id)
|
|
if node4:
|
|
extension_number = node4.get('extensionNumber', "")
|
|
|
|
node_info['conditions'].append({
|
|
'condition_string': condition_str,
|
|
'extensionNumber': extension_number
|
|
})
|
|
|
|
conditions_data['node3_conditions'].append(node_info)
|
|
|
|
# Definir el nombre del archivo, por ejemplo: tab_<tab_id>_conditions.json
|
|
conditions_filename = f'tab_{tab_id}_conditions.json'
|
|
conditions_file_path = os.path.join(ivr_dir, conditions_filename)
|
|
|
|
# Guardar las condiciones en el archivo .json
|
|
with open(conditions_file_path, 'w', encoding='utf-8') as json_file:
|
|
json.dump(conditions_data, json_file, ensure_ascii=False, indent=4)
|
|
|
|
return JsonResponse({'message': 'Datos guardados exitosamente'})
|
|
|
|
except Exception as e:
|
|
return JsonResponse({'error': str(e)}, status=500)
|
|
|
|
# Vista para cargar los datos de una pestaña específica
|
|
def load_tab_data(request, tab_id):
|
|
try:
|
|
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
|
|
# Buscar el archivo JSON correspondiente a la tab_id
|
|
matching_files = [f for f in os.listdir(data_dir) if f.startswith(f'tab_{tab_id}_') and f.endswith('.json')]
|
|
if not matching_files:
|
|
return JsonResponse({'nodes': [], 'connections': [], 'tabName': f'Pestaña {tab_id}'})
|
|
|
|
file_path = os.path.join(data_dir, matching_files[0])
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# Asegurarse de que node1, node3 y node4 no tengan audioFile
|
|
for node in data.get('nodes', []):
|
|
node_type = node.get('nodeType')
|
|
if node_type in ['node1', 'node3', 'node4']:
|
|
node.pop('audioFile', None)
|
|
# Para node4, no hay 'receivedString' sino 'extensionNumber', asegurarse de manejar correctamente
|
|
|
|
return JsonResponse(data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error en load_tab_data: {e}")
|
|
return JsonResponse({'error': str(e)}, status=500)
|
|
|
|
# Nueva Vista para eliminar archivos de Node3 cuando se actualiza
|
|
@csrf_exempt
|
|
def delete_files(request):
|
|
if request.method != 'POST':
|
|
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
|
|
|
|
try:
|
|
data = json.loads(request.body)
|
|
tab_id = data.get('tabId')
|
|
node_id = data.get('nodeId') # Agregamos nodeId
|
|
node_type = data.get('nodeType')
|
|
old_title = data.get('oldTitle')
|
|
|
|
if not tab_id or not node_id or not node_type or not old_title:
|
|
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
|
|
|
|
# Procesar solo si el nodo es node3
|
|
if node_type != "node3":
|
|
return JsonResponse({'status': 'error', 'message': 'Este nodo no tiene archivos para eliminar'}, status=200)
|
|
|
|
tab_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}')
|
|
text_dir = os.path.join(tab_dir, 'text_files')
|
|
os.makedirs(text_dir, exist_ok=True)
|
|
|
|
# Generar nombres de archivo basados en el antiguo título, node_type y node_id
|
|
safe_old_title = "".join([c if c.isalnum() else "_" for c in old_title])
|
|
filename_base = f"{safe_old_title}_{node_type}_{node_id}"
|
|
txt_filename = f"{filename_base}.txt"
|
|
txt_filepath = os.path.join(text_dir, txt_filename)
|
|
|
|
# Eliminar el archivo de texto si existe
|
|
if os.path.exists(txt_filepath):
|
|
os.remove(txt_filepath)
|
|
|
|
return JsonResponse({'status': 'success', 'message': 'Archivo de texto eliminado correctamente'})
|
|
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
|
|
|
|
# Vista para subir audio para Node2
|
|
@csrf_exempt
|
|
def upload_audio(request):
|
|
if request.method != 'POST':
|
|
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
|
|
|
|
try:
|
|
tab_id = request.POST.get('tabId')
|
|
node_id = request.POST.get('nodeId') # Agregamos nodeId
|
|
node_type = request.POST.get('nodeType')
|
|
title = request.POST.get('title')
|
|
audio_file = request.FILES.get('audioFile')
|
|
|
|
if not tab_id or not node_id or not node_type or not title or not audio_file:
|
|
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
|
|
|
|
# Procesar solo si el nodo es node2
|
|
if node_type != "node2":
|
|
return JsonResponse({'status': 'error', 'message': 'Este endpoint solo es para node2'}, status=400)
|
|
|
|
# Directorios específicos para la pestaña
|
|
tab_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}')
|
|
audio_dir = os.path.join(tab_dir, 'audio_files')
|
|
os.makedirs(audio_dir, exist_ok=True)
|
|
|
|
# Generar nombre único para el archivo de audio usando node_id
|
|
safe_title = "".join([c if c.isalnum() else "_" for c in title])
|
|
filename_base = f"{safe_title}_{node_type}_{node_id}"
|
|
wav_filename = f"{filename_base}.wav"
|
|
wav_filepath = os.path.join(audio_dir, wav_filename)
|
|
|
|
# Guardar el archivo de audio en /media/tabs
|
|
with open(wav_filepath, 'wb+') as destination:
|
|
for chunk in audio_file.chunks():
|
|
destination.write(chunk)
|
|
|
|
# Copiar el archivo a /var/spool/asterisk/recordings
|
|
asterisk_dir = '/var/lib/asterisk/sounds'
|
|
os.makedirs(asterisk_dir, exist_ok=True) # Asegurarse de que el directorio exista
|
|
asterisk_filepath = os.path.join(asterisk_dir, wav_filename)
|
|
|
|
shutil.copy(wav_filepath, asterisk_filepath)
|
|
|
|
# Generar URL del archivo
|
|
audio_file_url = os.path.join(settings.MEDIA_URL, 'tabs', f'tab_{tab_id}', 'audio_files', wav_filename)
|
|
|
|
return JsonResponse({
|
|
'status': 'success',
|
|
'audioFile': audio_file_url,
|
|
'asteriskFile': asterisk_filepath # Confirmación de la ruta en Asterisk
|
|
})
|
|
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
|
|
|
|
# Vista para eliminar audio de Node2 y Node6
|
|
@csrf_exempt
|
|
def delete_audio(request):
|
|
if request.method != 'POST':
|
|
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
|
|
|
|
try:
|
|
data = json.loads(request.body)
|
|
tab_id = data.get('tabId')
|
|
node_id = data.get('nodeId') # Agregamos nodeId
|
|
node_type = data.get('nodeType')
|
|
old_audio_file = data.get('oldAudioFile')
|
|
|
|
if not tab_id or not node_id or not node_type or not old_audio_file:
|
|
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
|
|
|
|
# Procesar solo si el nodo es node2 o node6
|
|
if node_type not in ["node2", "node6"]:
|
|
return JsonResponse({'status': 'error', 'message': 'Este endpoint solo es para node2 o node6'}, status=400)
|
|
|
|
# Obtener la ruta completa del archivo de audio
|
|
audio_file_path = os.path.join(settings.BASE_DIR, old_audio_file.replace(settings.MEDIA_URL, ''))
|
|
|
|
if os.path.exists(audio_file_path):
|
|
os.remove(audio_file_path)
|
|
|
|
return JsonResponse({'status': 'success', 'message': 'Archivo de audio eliminado correctamente'})
|
|
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
|
|
|
|
# Vista para renombrar audio de Node2 y Node6
|
|
@csrf_exempt
|
|
def rename_audio(request):
|
|
"""
|
|
Renombra el archivo de audio asociado a node2 o node6.
|
|
"""
|
|
if request.method != 'POST':
|
|
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
|
|
|
|
try:
|
|
data = json.loads(request.body)
|
|
tab_id = data.get('tabId')
|
|
node_id = data.get('nodeId')
|
|
node_type = data.get('nodeType')
|
|
old_title = data.get('oldTitle')
|
|
new_title = data.get('newTitle')
|
|
current_audio_file = data.get('currentAudioFile')
|
|
|
|
if not tab_id or not node_id or not node_type or not old_title or not new_title or not current_audio_file:
|
|
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
|
|
|
|
# Procesar solo si el nodo es node2 o node6
|
|
if node_type not in ["node2", "node6"]:
|
|
return JsonResponse({'status': 'error', 'message': 'Este endpoint solo es para node2 o node6'}, status=400)
|
|
|
|
# Directorios específicos para la pestaña
|
|
audio_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}', 'audio_files')
|
|
os.makedirs(audio_dir, exist_ok=True)
|
|
|
|
# Generar nombres de archivo basados en los títulos antiguos y nuevos y node_id
|
|
safe_old_title = get_safe_filename(old_title)
|
|
safe_new_title = get_safe_filename(new_title)
|
|
old_filename_base = f"{node_id}_{safe_old_title}"
|
|
new_filename_base = f"{node_id}_{safe_new_title}"
|
|
old_wav_filename = f"{old_filename_base}.wav"
|
|
new_wav_filename = f"{new_filename_base}.wav"
|
|
|
|
old_wav_filepath = os.path.join(audio_dir, old_wav_filename)
|
|
new_wav_filepath = os.path.join(audio_dir, new_wav_filename)
|
|
|
|
# Renombrar el archivo de audio
|
|
if os.path.exists(old_wav_filepath):
|
|
os.rename(old_wav_filepath, new_wav_filepath)
|
|
# Copiar el archivo renombrado a Asterisk
|
|
asterisk_dir = '/var/lib/asterisk/sounds'
|
|
asterisk_filepath = os.path.join(asterisk_dir, new_wav_filename)
|
|
shutil.copy(new_wav_filepath, asterisk_filepath)
|
|
|
|
# Generar URL del nuevo archivo
|
|
new_audio_file_url = os.path.join(settings.MEDIA_URL, 'tabs', f'tab_{tab_id}', 'audio_files', new_wav_filename)
|
|
new_audio_file_url = new_audio_file_url.replace('\\', '/') # Asegurar la barra correcta en Windows
|
|
return JsonResponse({
|
|
'status': 'success',
|
|
'newAudioFile': new_audio_file_url # Ruta relativa
|
|
})
|
|
else:
|
|
return JsonResponse({'status': 'error', 'message': 'Archivo de audio original no encontrado'}, status=404)
|
|
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
|
|
|
|
# Vista para renombrar una pestaña
|
|
@csrf_exempt
|
|
def rename_tab(request):
|
|
"""
|
|
Renombra una pestaña específica.
|
|
"""
|
|
if request.method != 'POST':
|
|
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
|
|
|
|
try:
|
|
data = json.loads(request.body)
|
|
tab_id = data.get('tabId')
|
|
new_tab_name = data.get('newTabName')
|
|
|
|
if not tab_id or not new_tab_name:
|
|
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
|
|
|
|
safe_new_tab_name = get_safe_filename(new_tab_name)
|
|
|
|
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
|
|
|
|
# Buscar el archivo JSON correspondiente a la tab_id
|
|
matching_files = [f for f in os.listdir(data_dir) if f.startswith(f'tab_{tab_id}_') and f.endswith('.json')]
|
|
if not matching_files:
|
|
return JsonResponse({'status': 'error', 'message': 'Archivo de pestaña no encontrado'}, status=404)
|
|
|
|
old_filename = matching_files[0]
|
|
old_file_path = os.path.join(data_dir, old_filename)
|
|
|
|
# Nuevo nombre de archivo
|
|
new_filename = f'tab_{tab_id}_{safe_new_tab_name}.json'
|
|
new_file_path = os.path.join(data_dir, new_filename)
|
|
|
|
# Renombrar el archivo
|
|
os.rename(old_file_path, new_file_path)
|
|
|
|
# Actualizar el tabName dentro del JSON
|
|
with open(new_file_path, 'r', encoding='utf-8') as f:
|
|
tab_data = json.load(f)
|
|
|
|
tab_data['tabName'] = new_tab_name
|
|
|
|
with open(new_file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(tab_data, f, ensure_ascii=False, indent=4)
|
|
|
|
return JsonResponse({'status': 'success', 'message': 'Pestaña renombrada exitosamente'})
|
|
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
|
|
|
|
# Vista para actualizar Node1 (título y extensión)
|
|
@csrf_exempt
|
|
def update_node1(request):
|
|
"""
|
|
Actualiza el título y número de extensión de un node1 específico.
|
|
"""
|
|
if request.method != 'POST':
|
|
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
|
|
|
|
try:
|
|
data = json.loads(request.body)
|
|
tab_id = data.get('tabId')
|
|
tab_name = data.get('tabName')
|
|
node_id = data.get('nodeId')
|
|
node_type = data.get('nodeType')
|
|
new_title = data.get('newTitle')
|
|
new_extension = data.get('newExtension')
|
|
old_title = data.get('oldTitle')
|
|
|
|
# Directorio de configuración de Asterisk para extensiones personalizadas
|
|
directory = '/etc/asterisk/extensions_custom.conf'
|
|
|
|
file_content = f"""
|
|
exten => {new_extension},1,Answer()
|
|
exten => {new_extension},n,AGI(/var/lib/asterisk/agi-bin/{tab_name}.py)
|
|
exten => {new_extension},n,Hangup()
|
|
"""
|
|
|
|
try:
|
|
shutil.copyfile(directory, f"{directory}.bak") # Crear una copia de seguridad
|
|
with open(directory, 'a') as conf_file:
|
|
conf_file.write(file_content)
|
|
except Exception as e:
|
|
logger.error(f"No fue posible crear o escribir en el archivo {directory}: {e}")
|
|
|
|
if not tab_id or not node_id or not node_type or not new_title or not new_extension or not old_title:
|
|
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
|
|
|
|
# Procesar solo si el nodo es node1
|
|
if node_type != "node1":
|
|
return JsonResponse({'status': 'error', 'message': 'Este endpoint solo es para node1'}, status=400)
|
|
|
|
# Directorios específicos para la pestaña
|
|
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
|
|
# Buscar el archivo JSON correspondiente a la pestaña
|
|
matching_files = [f for f in os.listdir(data_dir) if re.match(rf'tab_{tab_id}_.+\.json', f)]
|
|
if not matching_files:
|
|
return JsonResponse({'status': 'error', 'message': 'Archivo de pestaña no encontrado'}, status=404)
|
|
|
|
file_path = os.path.join(data_dir, matching_files[0])
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
tab_data = json.load(f)
|
|
|
|
# Buscar el nodo correspondiente y actualizar sus datos
|
|
for node in tab_data.get('nodes', []):
|
|
if node['id'] == node_id and node['nodeType'] == 'node1':
|
|
node['title'] = new_title
|
|
node['extensionNumber'] = new_extension
|
|
break
|
|
|
|
# Guardar el archivo .json actualizado
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(tab_data, f, ensure_ascii=False, indent=4)
|
|
|
|
return JsonResponse({
|
|
'status': 'success',
|
|
'message': 'Nodo1 actualizado exitosamente'
|
|
})
|
|
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
|
|
|
|
# Vista para eliminar un nodo
|
|
@csrf_exempt
|
|
def delete_node(request):
|
|
if request.method != 'POST':
|
|
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
|
|
|
|
try:
|
|
data = json.loads(request.body)
|
|
tab_id = data.get('tabId')
|
|
node_id = data.get('nodeId')
|
|
node_type = data.get('nodeType')
|
|
|
|
if not tab_id or not node_id or not node_type:
|
|
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
|
|
|
|
# Cargar el archivo de datos de la pestaña
|
|
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
|
|
# Buscar el archivo JSON correspondiente a la pestaña
|
|
matching_files = [f for f in os.listdir(data_dir) if f.startswith(f'tab_{tab_id}_') and f.endswith('.json')]
|
|
if not matching_files:
|
|
return JsonResponse({'status': 'error', 'message': 'Archivo de pestaña no encontrado'}, status=404)
|
|
|
|
file_path = os.path.join(data_dir, matching_files[0])
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
tab_data = json.load(f)
|
|
|
|
# Obtener los datos del nodo antes de eliminarlo
|
|
node_data = next((n for n in tab_data.get('nodes', []) if n['id'] == node_id), None)
|
|
if not node_data:
|
|
return JsonResponse({'status': 'error', 'message': 'Nodo no encontrado en la pestaña'}, status=404)
|
|
node_title = node_data.get('title', '')
|
|
|
|
# Eliminar el nodo y sus conexiones
|
|
tab_data['nodes'] = [node for node in tab_data.get('nodes', []) if node['id'] != node_id]
|
|
tab_data['connections'] = [conn for conn in tab_data.get('connections', [])
|
|
if conn['startNodeId'] != node_id and conn['endNodeId'] != node_id]
|
|
|
|
# Si es un nodo3, nodo2 o nodo6, manejar eliminación de archivos
|
|
if node_type == 'node3':
|
|
# Lógica para eliminar archivos de texto
|
|
text_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}', 'text_files')
|
|
safe_title = "".join([c if c.isalnum() else "_" for c in node_title])
|
|
txt_filename = f"{safe_title}_node3_{node_id}.txt"
|
|
txt_filepath = os.path.join(text_dir, txt_filename)
|
|
if os.path.exists(txt_filepath):
|
|
os.remove(txt_filepath)
|
|
|
|
elif node_type in ['node2', 'node6']:
|
|
# Lógica para eliminar archivos de audio
|
|
audio_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}', 'audio_files')
|
|
safe_title = "".join([c if c.isalnum() else "_" for c in node_title])
|
|
wav_filename = f"{node_id}_{safe_title}.WAV"
|
|
wav_filepath = os.path.join(audio_dir, wav_filename)
|
|
if os.path.exists(wav_filepath):
|
|
os.remove(wav_filepath)
|
|
|
|
# Guardar los datos actualizados
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(tab_data, f, ensure_ascii=False, indent=4)
|
|
|
|
return JsonResponse({'status': 'success', 'message': 'Nodo y sus conexiones eliminados exitosamente'})
|
|
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
|
|
|
|
# Vista para eliminar una pestaña
|
|
@csrf_exempt
|
|
def delete_tab(request):
|
|
if request.method != 'POST':
|
|
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
|
|
|
|
try:
|
|
data = json.loads(request.body)
|
|
tab_id = data.get('tabId')
|
|
tab_name = data.get('tabName')
|
|
|
|
if not tab_id or not tab_name:
|
|
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
|
|
|
|
# Eliminar el archivo de datos de la pestaña
|
|
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
|
|
# Buscar el archivo JSON correspondiente a la pestaña
|
|
matching_files = [f for f in os.listdir(data_dir) if f.startswith(f'tab_{tab_id}_') and f.endswith('.json')]
|
|
if not matching_files:
|
|
return JsonResponse({'status': 'error', 'message': 'Archivo de pestaña no encontrado'}, status=404)
|
|
|
|
file_path = os.path.join(data_dir, matching_files[0])
|
|
os.remove(file_path)
|
|
|
|
# Eliminar los archivos de medios asociados (audio_files y text_files)
|
|
media_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}')
|
|
if os.path.exists(media_dir):
|
|
shutil.rmtree(media_dir)
|
|
|
|
# Eliminar el archivo de condiciones JSON
|
|
ivr_dir = '/home/Flow_IA/IVR/'
|
|
conditions_filename = f'tab_{tab_id}_conditions.json'
|
|
conditions_file_path = os.path.join(ivr_dir, conditions_filename)
|
|
if os.path.exists(conditions_file_path):
|
|
os.remove(conditions_file_path)
|
|
|
|
return JsonResponse({'status': 'success', 'message': 'Pestaña eliminada exitosamente'})
|
|
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
|
|
|
|
# Vista para actualizar Node4
|
|
@csrf_exempt
|
|
def update_node4(request):
|
|
if request.method != 'POST':
|
|
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
|
|
|
|
try:
|
|
data = json.loads(request.body)
|
|
tab_id = data.get('tabId')
|
|
tab_name = data.get('tabName')
|
|
node_id = data.get('nodeId')
|
|
node_type = data.get('nodeType')
|
|
extension_number = data.get('extensionNumber') # Renombrado de receivedString a extensionNumber
|
|
|
|
if not tab_id or not node_id or not node_type or extension_number is None:
|
|
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
|
|
|
|
if node_type != "node4":
|
|
return JsonResponse({'status': 'error', 'message': 'Este endpoint solo es para node4'}, status=400)
|
|
|
|
# Guardar el número de extensión en el archivo de la pestaña
|
|
data_dir = os.path.join(settings.BASE_DIR, 'flow', 'data')
|
|
# Buscar el archivo JSON correspondiente a la pestaña
|
|
matching_files = [f for f in os.listdir(data_dir) if re.match(rf'tab_{tab_id}_.+\.json', f)]
|
|
if not matching_files:
|
|
return JsonResponse({'status': 'error', 'message': 'Archivo de pestaña no encontrado'}, status=404)
|
|
|
|
file_path = os.path.join(data_dir, matching_files[0])
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
tab_data = json.load(f)
|
|
|
|
# Buscar el node4 correspondiente y actualizar su extensionNumber
|
|
node_found = False
|
|
for node in tab_data.get('nodes', []):
|
|
if node['id'] == node_id and node['nodeType'] == 'node4':
|
|
node['extensionNumber'] = extension_number
|
|
node_found = True
|
|
break
|
|
|
|
if not node_found:
|
|
return JsonResponse({'status': 'error', 'message': 'node4 no encontrado'}, status=404)
|
|
|
|
# Guardar el archivo .json actualizado
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(tab_data, f, ensure_ascii=False, indent=4)
|
|
|
|
return JsonResponse({'status': 'success', 'message': 'node4 actualizado exitosamente'})
|
|
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
|
|
|
|
# Vista para generar audios por medio del TTS
|
|
@csrf_exempt
|
|
def tts_module(request, language="es"):
|
|
"""
|
|
Genera un archivo de audio a partir de un texto utilizando TTS y lo guarda en el directorio correspondiente.
|
|
"""
|
|
if request.method != 'POST':
|
|
return JsonResponse({'status': 'error', 'message': 'Solo se permiten solicitudes POST'}, status=405)
|
|
|
|
try:
|
|
data = json.loads(request.body)
|
|
tab_id = data.get('tabId')
|
|
title = data.get('title')
|
|
text = data.get("text")
|
|
node_id = data.get("nodeId")
|
|
node_type = data.get("nodeType")
|
|
|
|
if not tab_id or not title or not text or not node_id or not node_type:
|
|
return JsonResponse({'status': 'error', 'message': 'Datos incompletos'}, status=400)
|
|
|
|
# Directorios específicos para la pestaña
|
|
audio_dir = os.path.join(settings.MEDIA_ROOT, 'tabs', f'tab_{tab_id}', 'audio_files')
|
|
os.makedirs(audio_dir, exist_ok=True)
|
|
|
|
# Generar nombre único para el archivo de audio usando node_id y node_title
|
|
wav_filename = f"{title}_{node_type}_{node_id}.wav" # Estructura de nombre {node_id}_{node_title}.wav
|
|
wav_filepath = os.path.join(audio_dir, wav_filename)
|
|
|
|
# Convertir texto a voz con el idioma seleccionado
|
|
tts_obj = gTTS(text, lang=language, slow=False)
|
|
speech_file_path = Path("speech.mp3")
|
|
tts_obj.save(speech_file_path)
|
|
|
|
# Ajustar la velocidad del audio
|
|
speed = 1.1 # Velocidad fija
|
|
audio = AudioSegment.from_file(speech_file_path)
|
|
faster_audio = audio._spawn(audio.raw_data, overrides={
|
|
"frame_rate": int(audio.frame_rate * speed)
|
|
}).set_frame_rate(audio.frame_rate)
|
|
|
|
# Convertir a formato WAV a 8000 kHz
|
|
faster_audio = faster_audio.set_frame_rate(8000)
|
|
faster_audio.export(wav_filepath, format="wav")
|
|
|
|
os.remove(speech_file_path)
|
|
|
|
asterisk_dir = '/var/lib/asterisk/sounds'
|
|
os.makedirs(asterisk_dir, exist_ok=True)
|
|
asterisk_filepath = os.path.join(asterisk_dir, wav_filename)
|
|
shutil.copy(wav_filepath, asterisk_filepath)
|
|
|
|
# Generar URL del archivo
|
|
audio_file_url = os.path.join(settings.MEDIA_URL, 'tabs', f'tab_{tab_id}', 'audio_files', wav_filename)
|
|
|
|
return JsonResponse({'status': 'success', 'audioFile': audio_file_url, 'message': 'Audio saved successfully'})
|
|
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
|
|
|
|
def apply_migrations_to_dynamic_db():
|
|
try:
|
|
call_command('migrate', database='dynamic')
|
|
print("Migraciones aplicadas con éxito a la base de datos dinámica.")
|
|
except Exception as e:
|
|
raise Exception(f"Error al aplicar migraciones: {str(e)}")
|
|
|
|
DB_ENGINES = {
|
|
'mysql': 'django.db.backends.mysql',
|
|
'mssql': 'sql_server.pyodbc',
|
|
'postgresql': 'django.db.backends.postgresql',
|
|
'oracle': 'django.db.backends.oracle',
|
|
}
|
|
|
|
def configure_dynamic_connection(db_config, db_choice):
|
|
alias = 'dynamic'
|
|
|
|
# Se verifica si existe una conexión y en caso de haberla, esta se cierra
|
|
if alias in connections.databases:
|
|
connections[alias].close() # Cierra la conexión activa
|
|
del connections.databases[alias] # Elimina la configuración de la conexión
|
|
|
|
# Configurar opciones específicas del motor
|
|
if db_choice == 'mssql':
|
|
db_config['OPTIONS'] = {'driver': 'ODBC Driver 17 for SQL Server'}
|
|
elif db_choice == 'mysql':
|
|
db_config['OPTIONS'] = {'init_command': "SET sql_mode='STRICT_TRANS_TABLES'"}
|
|
|
|
# Registrar la configuración en el alias 'dynamic'
|
|
connections.databases[alias] = {
|
|
'ENGINE': db_config['ENGINE'],
|
|
'NAME': db_config['NAME'],
|
|
'USER': db_config['USER'],
|
|
'PASSWORD': db_config['PASSWORD'],
|
|
'HOST': db_config['HOST'],
|
|
'PORT': db_config['PORT'],
|
|
'OPTIONS': db_config.get('OPTIONS', {}),
|
|
'ATOMIC_REQUESTS': db_config.get('ATOMIC_REQUESTS', False),
|
|
'CONN_HEALTH_CHECKS' : db_config.get('CONN_HEALTH_CHECKS'),
|
|
'CONN_MAX_AGE': db_config.get('CONN_MAX_AGE', 0),
|
|
'TIME_ZONE': db_config.get('TIME_ZONE', settings.TIME_ZONE),
|
|
'AUTOCOMMIT': db_config.get('AUTOCOMMIT', True),
|
|
}
|
|
|
|
# Probar la conexión
|
|
try:
|
|
with connections[alias].cursor() as cursor:
|
|
cursor.execute("SELECT 1")
|
|
except OperationalError as e:
|
|
raise OperationalError(f"Error al conectar a la BD dinámica: {str(e)}")
|
|
|
|
@csrf_exempt
|
|
def bd_connection(request):
|
|
if request.method == 'POST':
|
|
try:
|
|
data = json.loads(request.body)
|
|
|
|
# Validar los datos del formulario
|
|
required_fields = ['bd', 'name_bd', 'user_bd', 'password_bd', 'host_bd', 'port_bd']
|
|
if not all(field in data for field in required_fields):
|
|
return JsonResponse({'status': 'error', 'message': 'Faltan campos requeridos'}, status=400)
|
|
|
|
# Cerrar conexiones anteriores
|
|
if 'dynamic' in connections.databases:
|
|
connections['dynamic'].close()
|
|
del connections.databases['dynamic']
|
|
|
|
# Determinar el motor
|
|
db_choice = data['bd']
|
|
engine = DB_ENGINES.get(db_choice)
|
|
if not engine:
|
|
return JsonResponse({'status': 'error', 'message': f'Motor no soportado: {db_choice}'}, status=400)
|
|
|
|
# Configurar conexión dinámica
|
|
db_config = {
|
|
'ENGINE': engine,
|
|
'NAME': data['name_bd'],
|
|
'USER': data['user_bd'],
|
|
'PASSWORD': data['password_bd'],
|
|
'HOST': data['host_bd'],
|
|
'PORT': data['port_bd'],
|
|
'ATOMIC_REQUESTS': False,
|
|
'TIME_ZONE': settings.TIME_ZONE,
|
|
'CONN_HEALTH_CHECKS': False,
|
|
'CONN_MAX_AGE': None,
|
|
'OPTIONS': {},
|
|
'AUTOCOMMIT': True,
|
|
}
|
|
|
|
# Configurar y aplicar migraciones
|
|
configure_dynamic_connection(db_config, db_choice)
|
|
#apply_migrations_to_dynamic_db()
|
|
#configure_session_backend('dynamic')
|
|
|
|
request.session['dynamic_db_config'] = db_config
|
|
request.session['db_choice'] = db_choice
|
|
|
|
return JsonResponse({'status': 'success', 'message': 'Conexión exitosa'})
|
|
except OperationalError as e:
|
|
return JsonResponse({'status': 'error', 'message': str(e)}, status=400)
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': 'Error inesperado', 'details': str(e)}, status=500)
|
|
return JsonResponse({'error': 'Método no permitido'}, status=405)
|
|
|
|
def convert_bytes_to_str(value):
|
|
"""Convierte valores bytes a string o hexadecimal."""
|
|
if isinstance(value, bytes):
|
|
try:
|
|
return value.decode('utf-8') # Intenta decodificar como texto
|
|
except UnicodeDecodeError:
|
|
return value.hex() # Si falla, convierte a hexadecimal
|
|
return value
|
|
|
|
@csrf_exempt
|
|
def bd_query(request):
|
|
if request.method == 'POST':
|
|
dynamic_config = request.session.get('dynamic_db_config')
|
|
db_choice = request.session.get('db_choice')
|
|
|
|
if not dynamic_config:
|
|
return JsonResponse({'status': 'error', 'message': 'No hay conexión configurada.'}, status=400)
|
|
|
|
try:
|
|
# Restaurar conexión si es necesario
|
|
configure_dynamic_connection(dynamic_config, db_choice)
|
|
|
|
data = json.loads(request.body)
|
|
query = data.get('query')
|
|
if not query:
|
|
return JsonResponse({'status': 'error', 'message': 'No se proporcionó una consulta.'}, status=400)
|
|
|
|
# Ejecutar consulta en la conexión dinámica
|
|
with connections['dynamic'].cursor() as cursor:
|
|
cursor.execute(query)
|
|
columns = [col[0] for col in cursor.description] # Obtener nombres de columnas
|
|
result = cursor.fetchall()
|
|
|
|
# Procesar los resultados para manejar valores bytes
|
|
processed_result = [
|
|
{columns[i]: convert_bytes_to_str(row[i]) for i in range(len(columns))}
|
|
for row in result
|
|
]
|
|
|
|
return JsonResponse({'status': 'success', 'data': processed_result, 'columns': columns})
|
|
except Exception as e:
|
|
return JsonResponse({'status': 'error', 'message': str(e), 'details': db_choice}, status=400)
|
|
return JsonResponse({'error': 'Método no permitido'}, status=405) |