You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
resumen_backoffice/resume_end_call_sapian.py

245 lines
7.5 KiB

import os
from datetime import datetime
import sys
import pymysql
import tempfile
from pydub import AudioSegment
import speech_recognition as sr
import anthropic
def consultar_claude(text):
ruta_archivo = "/home/promt_back.txt"
# Leer el contenido del archivo y asignarlo a un string
try:
with open(ruta_archivo, "r") as archivo:
contenido = archivo.read()
except FileNotFoundError:
print(f"El archivo {ruta_archivo} no existe.")
except Exception as e:
print(f"Ocurrió un error al leer el archivo: {e}")
client = anthropic.Anthropic(api_key='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')
message = client.messages.create(
model="claude-3-5-haiku-20241022",
#model = "claude-3-5-sonnet-20241022",
max_tokens=1000,
temperature=0,
system=(
"A continuación te paso el texto de una llamada y necesito un resumen completo que incluyas la calificacion del agentes con los siguientes parametros:" + contenido + "si la llamada no se puede identificar o resumir solo devuelve 4 asteriscos seguidos ****"
),
messages=[
{
"role": "user",
"content": [{"type": "text", "text": text}]
}
]
)
respuesta_claude = "".join([msg.text for msg in message.content])
# Procesar para agregar la clase TEMA a todas las celdas correspondientes
respuesta_claude = respuesta_claude.replace("<td>", "<td class='TEMA'>")
return respuesta_claude
def get_filename_from_db(recording_id):
connection = pymysql.connect(
host='sapian-ccdb-clients-sapian.service.dc-k8s-voe.consul',
user='cctel',
password='PpteK0GYyi',
database='asterisk'
)
try:
with connection.cursor() as cursor:
query = "SELECT filename FROM recording_log WHERE recording_id = %s"
cursor.execute(query, (recording_id,))
result = cursor.fetchone()
if result:
return result[0]
else:
raise ValueError("No se encontró la grabación con el recording_id proporcionado.")
finally:
connection.close()
#def split_audio(file_path, output_folder):
# audio = AudioSegment.from_file(file_path)
# duration = len(audio) # Duration in milliseconds
# chunk_duration = 60 * 1000 # 1 minute in milliseconds
# chunks = []
# for i in range(0, duration, chunk_duration):
# chunk = audio[i:i + chunk_duration]
# chunk_name = os.path.join(output_folder, f"chunk_{i//chunk_duration + 1}.wav")
# chunk.export(chunk_name, format="wav")
# chunks.append(chunk_name)
# return chunks
def split_audio(file_path, output_folder):
audio = AudioSegment.from_file(file_path)
duration = len(audio) # Duration in milliseconds
chunk_duration = 60 * 1000 # 1 minute in milliseconds
min_duration = 30 * 1000 # 30 seconds in milliseconds
chunks = []
for i in range(0, duration, chunk_duration):
chunk = audio[i:i + chunk_duration]
# Verificar si la duración del fragmento es menor a 30 seg
if len(chunk) < min_duration:
print("El último fragmento es menor a 30 segundos. Terminando el programa.")
sys.exit(0)
chunk_name = os.path.join(output_folder, f"chunk_{(i//chunk_duration) + 1}.wav")
chunk.export(chunk_name, format="wav")
chunks.append(chunk_name)
return chunks
def transcribe_audio_chunks(chunks):
recognizer = sr.Recognizer()
transcription = []
for idx, chunk in enumerate(chunks, 1):
with sr.AudioFile(chunk) as source:
audio_data = recognizer.record(source)
try:
text = recognizer.recognize_google(audio_data, language="es-ES")
transcription.append(f"Fragmento {idx}: {text}")
except sr.UnknownValueError:
transcription.append(f"Fragmento {idx}: No se pudo reconocer el audio")
except sr.RequestError as e:
transcription.append(f"Fragmento {idx}: Error en el servicio de reconocimiento ({e})")
return "\n".join(transcription)
def generate_html_summary(summary, output_path, fecha_hora):
# Define todo el CSS en una sola variable, incluyendo el estilo para las dos primeras filas
css_styles = """
body {
font-family: Arial, sans-serif;
background-color: #f4f4f4;
margin: 0;
padding: 0;
text-align: center;
position: relative;
}
h1 {
font-weight: bold;
color: #4CAF50;
margin-top: 50px;
}
p {
font-size: 18px;
line-height: 1.6;
max-width: 800px;
margin: 20px auto;
text-align: justify;
}
.logo {
position: absolute;
top: 10px;
right: 20px;
width: 250px;
height: auto;
}
table {
width: 100%;
border-collapse: collapse;
margin: 20px 0px;
font-family: Arial, sans-serif;
background-color: white;
border: 2px solid #004d40;
padding: 20px;
}
th, td {
padding: 12px;
text-align: center;
border: 1px solid #ddd;
}
th {
background-color: #004d40ff;
color: white;
font-weight: bold;
font-size: 25px;
}
td.TEMA {
font-weight: bold;
}
tr:nth-child(1) {
background-color: #4CAF50; /* Verde para las dos primeras filas */
color: white; /* Texto en blanco para contraste */
}
tr:nth-child(even):not(:nth-child(1)):not(:nth-child(2)) {
background-color: white;
}
tr:hover {
background-color: #ddd;
}
"""
# Genera el contenido HTML
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>Resumen de Llamada</title>
<style>
{css_styles}
</style>
</head>
<body>
<img src="SAPIANLOGO.png" alt="Logo de la Empresa" class="logo">
<h1>Resumen de Llamada</h1>
<h2 id="fecha">{fecha_hora}</h2>
<table>
{summary}
</table>
</body>
</html>
"""
# Escribir el archivo HTML en el directorio deseado
with open(output_path, "w") as file:
file.write(html_content)
def main():
if len(sys.argv) != 2:
print("Uso: python script.py <recording_id>")
sys.exit(1)
recording_id = sys.argv[1]
fecha_hora_actual = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
filename = get_filename_from_db(recording_id)
audio_path = f"/var/spool/asterisk/monitor/{filename}"+"-in.WAV"
if not os.path.exists(audio_path):
raise FileNotFoundError(f"No se encontró la grabación en la ruta: {audio_path}")
temp_dir = tempfile.gettempdir()
output_folder = os.path.join(temp_dir, "audio_chunks")
os.makedirs(output_folder, exist_ok=True)
chunks = split_audio(audio_path, output_folder)
transcription = transcribe_audio_chunks(chunks)
resumen = consultar_claude(transcription)
if resumen == "****":
sys.exit()
fecha_hora_actual = fecha_hora_actual.replace(" ", "")
output_html = f"/var/www/html/resumen_sapian_{fecha_hora_actual}_{recording_id}.html"
generate_html_summary(resumen, output_html, fecha_hora_actual)
print(f"Proceso completado. Resumen generado en: {output_html}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()