瀏覽代碼

Commit inicial

Alejandro Rosales 1 年之前
父節點
當前提交
8f6b1f129e

二進制
__pycache__/argument_parser.cpython-39.pyc


二進制
__pycache__/data_processing.cpython-39.pyc


二進制
__pycache__/database_operations.cpython-39.pyc


二進制
__pycache__/main.cpython-39.pyc


二進制
__pycache__/selenium_setup.cpython-39.pyc


二進制
__pycache__/web_navigation.cpython-39.pyc


+ 181 - 0
app.py

@@ -0,0 +1,181 @@
+import psycopg2
+from selenium import webdriver
+from selenium.webdriver.chrome.service import Service
+from selenium.webdriver.common.by import By
+from selenium.webdriver.common.keys import Keys
+from selenium.webdriver.chrome.options import Options
+from selenium.common.exceptions import NoSuchElementException
+from selenium.webdriver.support.ui import WebDriverWait
+from selenium.webdriver.support import expected_conditions as EC
+
+import os
+from flask import Flask, request, jsonify
+from waitress import serve
+import pandas as pd
+from io import StringIO
+
+
+
+# Set options for the Chromium browser
+chrome_options = Options()
+chrome_options.add_argument("--headless")  # Optional: Run Chromium in headless mode
+chrome_options.add_argument("--no-sandbox")
+chrome_options.add_argument("--disable-dev-shm-usage")
+
+# Specify the path to the Chromium driver
+service = Service('/usr/bin/chromedriver')
+
+driver = None
+
+def extract(username: str, password: str):
+    url_credentials = f'https://{username}:{password}@sgu.ulsa.edu.mx/psulsa/alumnos/consultainformacionalumnos/consultainformacion.aspx'
+    url = 'https://sgu.ulsa.edu.mx/psulsa/alumnos/consultainformacionalumnos/consultainformacion.aspx'
+    username_integer = int(username[2:])
+    
+    def insert_alumno_extraccion(datos_html: str, materias_html: str, historial_html: str = 'error', materias_actuales_html: str = 'error'):
+        try:
+            conn = psycopg2.connect(
+                dbname=os.getenv("DBNAME"),
+                user=os.getenv("DBUSER"),
+                password=os.getenv("DBPASSWORD"),
+                host=os.getenv("DBHOST"),
+                port=os.getenv("DBPORT")
+            )
+            cur = conn.cursor()
+
+            insert_query = """
+                INSERT INTO public.alumno_extraccion ("Usuario_claveULSA", datos_html, materias_html, historial_html, materias_actuales_html) VALUES (%s, TRIM(%s), TRIM(%s), TRIM(%s)::JSONB, TRIM(%s))
+                ON CONFLICT ("Usuario_claveULSA") DO UPDATE SET datos_html = EXCLUDED.datos_html, materias_html = EXCLUDED.materias_html, error_message = NULL, registrado = DEFAULT;
+            """
+            cur.execute(insert_query, (username_integer, datos_html, materias_html, historial_html, materias_actuales_html))
+
+            conn.commit()
+            return cur.query.decode('utf-8')
+        except psycopg2.ProgrammingError as e:
+            print(f"Error de sintaxis: {e}")
+        except psycopg2.IntegrityError as e:
+            print(f"Error de integridad: {e}")
+        except Exception as e:
+            print(f"Error: {e}")
+        finally:
+            cur.close()
+            conn.close()
+        
+    def update_alumno_extraccion_error(error: str):
+        try:
+            
+            conn = psycopg2.connect(
+                dbname=os.getenv("DBNAME"),
+                user=os.getenv("DBUSER"),
+                password=os.getenv("DBPASSWORD"),
+                host=os.getenv("DBHOST"),
+                port=os.getenv("DBPORT")
+            )
+            cur = conn.cursor()
+
+            update_query = """
+                INSERT INTO public.alumno_extraccion  ("Usuario_claveULSA", error_message) VALUES (%s, %s)
+                ON CONFLICT ("Usuario_claveULSA") DO UPDATE SET error_message = EXCLUDED.error_message,
+                materias_html = DEFAULT, registrado = DEFAULT;
+            """
+            cur.execute(update_query, (username_integer, error[:255]))
+
+            conn.commit()
+            print("Data updated successfully")
+        except psycopg2.ProgrammingError as e:
+            print(f"Error de sintaxis: {e}")
+            
+        finally:
+            cur.close()
+            conn.close()
+
+    try:
+        driver.get(url_credentials)
+        driver.get(url)
+
+        # si no existe el elemento, ctl00_contenedor_control
+        datos_html = driver.find_element(By.ID, 'ctl00_contenedor_control').get_attribute('innerHTML')
+        
+        elemento = WebDriverWait(driver, 3.5).until(
+            EC.presence_of_element_located((By.ID, 'ctl00_contenedor_HistorialAlumno1_lblBtnSeccionHAcademico'))
+        )
+        elemento.click()
+        
+        # Get the HTML content of the materias element
+        materias_html = driver.find_element(By.ID, 'ctl00_contenedor_HistorialAlumno1_divHAcademico').get_attribute('innerHTML')
+        historial_html = driver.find_element(By.ID, 'ctl00_contenedor_HistorialAlumno1_gvMaterias').get_attribute('innerHTML')
+        # materias_actuales_html = driver.find_element(By.ID, 'ctl00_contenedor_HistorialAlumno1_div13').get_attribute('innerHTML')
+        
+        historial_html_io = StringIO(f"<table>{historial_html}</table>")
+        # Read the HTML table into a DataFrame
+        df = pd.read_html(historial_html_io)[0]
+
+        # Convert the DataFrame to JSON
+        json_result = df[df['GRUPO'] != 'Promedio:'].to_json(orient='records')
+        
+        # Connect to PostgreSQL database
+        query = insert_alumno_extraccion(datos_html, materias_html, json_result)
+        
+        print("Data extracted successfully")
+        return json_result
+
+    except NoSuchElementException as e:
+        update_alumno_extraccion_error(str(e))
+def se_puede_extraer():
+    try: 
+        conn = conn = psycopg2.connect(
+            dbname=os.getenv("DBNAME"),
+            user=os.getenv("DBUSER"),
+            password=os.getenv("DBPASSWORD"),
+            host=os.getenv("DBHOST"),
+            port=os.getenv("DBPORT")
+        )
+        cursor = conn.cursor()
+        
+        # SELECCIONAR ULTIMA planeacion
+        
+        
+        query = """
+                SELECT 1 
+                FROM alumno_extraccion_fecha 
+                WHERE CURRENT_DATE BETWEEN fecha_inicio AND fecha_fin 
+                ORDER BY CREATED_AT DESC 
+                LIMIT 1;
+                """
+        # Ejecuta la consulta
+        cursor.execute(query)
+        result = cursor.fetchone()
+        
+        # Verifica si se obtuvo algún resultado
+        exists = result is not None
+        
+        # Cierra el cursor y la conexión
+        cursor.close()
+        conn.close()
+        
+        return exists
+    except Exception as e:
+        print(f"Error: {e}")
+        return False
+
+app = Flask(__name__)
+
+@app.route('/calificaciones', methods=['POST'])
+def main():
+    global driver
+    # Initialize the WebDriver
+    driver = webdriver.Chrome(service=service, options=chrome_options)
+    
+    username = request.form.get('clave')
+    password = request.form.get('password')
+    if se_puede_extraer():
+        query = extract(username, password)
+    
+    # Close the session
+    driver.quit()
+    
+    return jsonify({"message": "Data extracted successfully"})
+
+if __name__ == '__main__':
+    serve(app, host='0.0.0.0', port=5000)
+    

+ 194 - 0
extraccion_html.c

@@ -0,0 +1,194 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <libpq-fe.h>
+#include <libxml/HTMLparser.h>
+#include <libxml/xpath.h>
+
+// Define the alumno struct
+struct alumno {
+    char apellido_paterno[100];
+    char apellido_materno[100];
+    char curp[20];
+    char clave_carrera[2];
+    char plan[2];
+    char clave[6];
+    char nombre[100];
+    char correo[100];
+    char estatus;
+    char telefono[11];
+    int semestre;
+    char sexo;
+};
+
+// Function to check for PostgreSQL connection errors
+void check_conn_status(PGconn *conn) {
+    if (PQstatus(conn) != CONNECTION_OK) {
+        fprintf(stderr, "Connection to database failed: %s", PQerrorMessage(conn));
+        PQfinish(conn);
+        exit(EXIT_FAILURE);
+    }
+}
+
+// Function to check for PostgreSQL query execution errors
+void check_exec_status(PGresult *res, PGconn *conn) {
+    if (PQresultStatus(res) != PGRES_TUPLES_OK) {
+        fprintf(stderr, "Query failed: %s", PQerrorMessage(conn));
+        PQclear(res);
+        PQfinish(conn);
+        exit(EXIT_FAILURE);
+    }
+}
+
+// Function to extract content from an HTML element by ID
+char* get_element_content_by_id(htmlDocPtr doc, const char *id) {
+    xmlChar xpath[100];
+    snprintf((char *)xpath, sizeof(xpath), "//*[@id='%s']", id);
+    
+    xmlXPathContextPtr xpathCtx = xmlXPathNewContext(doc);
+    xmlXPathObjectPtr xpathObj = xmlXPathEvalExpression(xpath, xpathCtx);
+    
+    if (xpathObj == NULL || xmlXPathNodeSetIsEmpty(xpathObj->nodesetval)) {
+        xmlXPathFreeObject(xpathObj);
+        xmlXPathFreeContext(xpathCtx);
+        return NULL;
+    }
+
+    xmlNodePtr node = xpathObj->nodesetval->nodeTab[0];
+    xmlChar *content = xmlNodeGetContent(node);
+
+    xmlXPathFreeObject(xpathObj);
+    xmlXPathFreeContext(xpathCtx);
+
+    return (char *)content;
+}
+
+// Function to parse HTML content using libxml2 and populate the alumno struct
+void parse_html(const char *html, struct alumno *alum) {
+    htmlDocPtr doc = htmlReadMemory(html, strlen(html), NULL, NULL, HTML_PARSE_NOERROR | HTML_PARSE_NOWARNING);
+    if (doc == NULL) {
+        fprintf(stderr, "Failed to parse HTML\n");
+        return;
+    }
+
+    char *content;
+
+    content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblApPatAlumnoHP");
+    if (content) {
+        strncpy(alum->apellido_paterno, content, 100);
+        xmlFree(content);
+    }
+
+    content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblApMatAlumnoHP");
+    if (content) {
+        strncpy(alum->apellido_materno, content, 100);
+        xmlFree(content);
+    }
+
+    content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblCURPAlumnoHP");
+    if (content) {
+        strncpy(alum->curp, content, 20);
+        xmlFree(content);
+    }
+
+    content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_Header1_lblCveCarrera");
+    if (content) {
+        strncpy(alum->clave_carrera, content, 2);
+        xmlFree(content);
+    }
+
+    content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_Header1_lblAlupla");
+    if (content) {
+        strncpy(alum->plan, content, 4);
+        xmlFree(content);
+    }
+
+    content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_Header1_lblCveUlsa");
+    if (content) {
+        strncpy(alum->clave, content, 7);
+        xmlFree(content);
+    }
+
+    content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblNombreAlumnoHP");
+    if (content) {
+        strncpy(alum->nombre, content, 100);
+        xmlFree(content);
+    }
+
+    content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblCorreoAlumnoHP");
+    if (content) {
+        strncpy(alum->correo, content, 100);
+        xmlFree(content);
+    }
+
+    content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_Header1_lblStat");
+    if (content) {
+        alum->estatus = content[0];
+        xmlFree(content);
+    }
+
+    content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblTelefonoAlumnoHP");
+    if (content) {
+        strncpy(alum->telefono, content, 11);
+        xmlFree(content);
+    }
+
+    content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_Header1_lblSem");
+    if (content) {
+        alum->semestre = atoi(content);
+        xmlFree(content);
+    }
+
+    content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblSexoAlumnoHP");
+    if (content) {
+        alum->sexo = content[0];
+        xmlFree(content);
+    }
+
+    xmlFreeDoc(doc);
+}
+
+int main() {
+    // PostgreSQL connection parameters
+    const char *conninfo = "dbname=sgi user=postgres password=h3rcul3s#$ hostaddr=200.13.89.8 port=5432";
+    PGconn *conn = PQconnectdb(conninfo);
+
+    // Check connection status
+    check_conn_status(conn);
+
+    // Execute SQL query to retrieve HTML content
+    PGresult *res = PQexec(conn, "SELECT datos_html FROM public.alumno_extraccion WHERE error_message IS NULL");
+    check_exec_status(res, conn);
+
+    // Process each row
+    int rows = PQntuples(res);
+    for (int i = 0; i < rows; i++) {
+        char *html_content = PQgetvalue(res, i, 0);
+        // printf("HTML Content: %s\n", html_content);
+
+        struct alumno alum;
+        memset(&alum, 0, sizeof(alum));  // Initialize the struct to zero
+
+        parse_html(html_content, &alum);
+
+        printf("Apellido Paterno: %s\n", alum.apellido_paterno);
+        printf("Apellido Materno: %s\n", alum.apellido_materno);
+        printf("CURP: %s\n", alum.curp);
+        printf("Clave Carrera: %s\n", alum.clave_carrera);
+        printf("Plan: %s\n", alum.plan);
+        printf("Clave: %s\n", alum.clave);
+        printf("Nombre: %s\n", alum.nombre);
+        printf("Correo: %s\n", alum.correo);
+        printf("Estatus: %c\n", alum.estatus);
+        printf("Telefono: %s\n", alum.telefono);
+        printf("Semestre: %d\n", alum.semestre);
+        printf("Sexo: %c\n", alum.sexo);
+    }
+
+    // Clean up
+    PQclear(res);
+    PQfinish(conn);
+
+    return 0;
+}
+

File diff suppressed because it is too large
+ 48 - 0
ian_tabla.html


二進制
lib/__pycache__/argument_parser.cpython-36.pyc


二進制
lib/__pycache__/argument_parser.cpython-39.pyc


二進制
lib/__pycache__/data_processing.cpython-36.pyc


二進制
lib/__pycache__/data_processing.cpython-39.pyc


二進制
lib/__pycache__/database_operations.cpython-36.pyc


二進制
lib/__pycache__/database_operations.cpython-39.pyc


二進制
lib/__pycache__/funciones.cpython-36.pyc


二進制
lib/__pycache__/funciones.cpython-39.pyc


二進制
lib/__pycache__/selenium_setup.cpython-36.pyc


二進制
lib/__pycache__/selenium_setup.cpython-39.pyc


二進制
lib/__pycache__/web_navigation.cpython-36.pyc


二進制
lib/__pycache__/web_navigation.cpython-39.pyc


+ 17 - 0
lib/argument_parser.py

@@ -0,0 +1,17 @@
+import argparse
+import getpass
+
+def parse_arguments():
+    parser = argparse.ArgumentParser()
+    parser.add_argument("clave", help="Clave ULSA argument")
+    args = parser.parse_args()
+
+    clave = args.clave
+    if not clave.startswith('al') or not clave[2:].isdigit() or len(clave) != 8:
+        raise ValueError("Clave no válida. Debe comenzar con 'al' y tener 6 dígitos.")
+    
+    contraseña = getpass.getpass("Contraseña: ")
+    if not clave or not contraseña:
+        raise ValueError("Clave y/o contraseña no válidos")
+    
+    return clave, contraseña

+ 53 - 0
lib/data_processing.py

@@ -0,0 +1,53 @@
+from bs4 import BeautifulSoup
+
+def process_html(html_doc):
+    """
+    Procesa el HTML de la página y extrae la información relevante
+    como materias, estados del servicio social, etc.
+    """
+    soup = BeautifulSoup(html_doc, 'lxml')
+    table = soup.find('table', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_gvMaterias'})
+    
+    actualmente_cursadas = soup.find('table', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_gvMatOrdinario'})
+    
+    if table is None:
+        raise Exception("Tabla no encontrada en la página")
+    elif actualmente_cursadas is None:
+        raise Exception("Tabla de materias actualmente cursadas no encontrada en la página")
+
+    materias_sgu = []
+    headers = [header.text for header in table.find_all('th')]
+
+    for row in table.find_all('tr'):
+        cols = row.find_all('td')
+        if cols and not any(col.text == "Promedio:" for col in cols):
+            materias_sgu.append({ headers[i]: col.text for i, col in enumerate(cols) if not is_cell_empty(col.text) })
+    materias_actualmente_cursadas = []
+    headers = [header.text for header in actualmente_cursadas.find_all('th')]
+    for row in actualmente_cursadas.find_all('tr'):
+        cols = row.find_all('td')
+        if cols and not any(col.text == "No hay Datos" for col in cols):
+            materias_actualmente_cursadas.append({ headers[i]: col.text for i, col in enumerate(cols) if not is_cell_empty(col.text) })
+        
+    periodo_actual = soup.find('span', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_Header1_lblPeriodo'})
+    grupo_actual = soup.find('span', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_Header1_lblgru'})
+    
+    return clean_materias_sgu(materias_sgu), clean_materias_sgu(materias_actualmente_cursadas), f"'{periodo_actual.text}'", f"'{grupo_actual.text}'"
+
+def is_cell_empty(cell_content):
+    """
+    Verifica si el contenido de una celda está vacío o contiene solo espacios.
+    """
+    return not cell_content.strip() or cell_content == u'\xa0'
+
+def clean_materias_sgu(materias_sgu):
+    """
+    Limpia y ajusta las materias obtenidas del SGU para su procesamiento posterior.
+    """
+    for materia in materias_sgu:
+        if '\xa0' in materia:
+            materia['SEMESTRE'] = materia.pop('\xa0')
+    return materias_sgu
+
+# Puedes agregar más funciones según sea necesario para procesar otros aspectos del HTML
+

+ 47 - 0
lib/database_operations.py

@@ -0,0 +1,47 @@
+import psycopg2
+from psycopg2.extras import DictCursor
+
+def connect_to_database(dbname="sgi", user="postgres", password="h3rcul3s#$", host="200.13.89.8", port="5432"):
+    """
+    Establece una conexión a la base de datos y la retorna.
+    """
+    try:
+        connection = psycopg2.connect(
+            dbname=dbname,
+            user=user,
+            password=password,
+            host=host,
+            port=port
+        )
+        return connection
+    except psycopg2.Error as e:
+        print(f"No se pudo conectar a la base de datos: {e}")
+        exit()
+
+def query_all(sql):
+    with connect_to_database() as conn:
+        with conn.cursor(cursor_factory=DictCursor) as cur:
+            cur.execute(sql)
+            return cur.fetchall()
+
+def query_single(sql):
+    with connect_to_database() as conn:
+        with conn.cursor(cursor_factory=DictCursor) as cur:
+            cur.execute(sql)
+            return cur.fetchone()  # Returns a dictionary-like object
+
+def execute_query(sql):
+    with connect_to_database() as conn:
+        with conn.cursor() as cur:
+            cur.execute(sql)
+            conn.commit()  # Commit to save the insert operation
+
+def log(message, status, error_message=None, clave=None, no_insertadas=[]):
+    with connect_to_database(dbname="adcfi", user="postgres", password="Ultr4p0d3r0s0##", host="200.13.89.42", port="5432") as conn:
+        with conn.cursor() as cur:
+            cur.execute(f"""
+                        INSERT INTO calificaciones.calificaciones_log (response_status, error_message, user_ip, additional_info)
+                        VALUES ({status}, {f"'{error_message}'" if error_message else 'NULL'}, '200.13.89.42', '{{"clave": "{clave}", "message": "{message}", "no insertadas": "[{', '.join(no_insertada for no_insertada in no_insertadas)}]"}}')
+                        """)
+            conn.commit()
+

+ 99 - 0
lib/funciones.py

@@ -0,0 +1,99 @@
+from database_operations import query_all, query_single, execute_query
+
+def actualizar_servicio(clave, Alumno_serviciosocial):
+    # Actualizar servicio social
+    execute_query(f'''
+        UPDATE "Alumno" SET "Alumno_serviciosocial" = {Alumno_serviciosocial} WHERE "Usuario_claveULSA" = {int(clave[2:])}
+    ''')
+
+def get_periodos(materias_sgu):
+    return query_all(f'''SELECT * FROM "Periodo" WHERE "Periodo_shortname" IN ({",".join(f"'{materia['PERIODO']}'" for materia in materias_sgu)})''')
+
+def get_materias(materias_sgu):
+    return query_all('SELECT * FROM "Materia"')
+def get_tipo_calificaciones(materias_sgu):
+    return query_all(f'''SELECT * FROM "TipoCalificacion"''')
+def get_alumno(clave):
+    return query_single(f'SELECT "Carrera_id", "PlanEstudio_id" FROM "Alumno_view" WHERE "Usuario_claveULSA" = {clave[2:]}')
+def get_grupo(grupo, carrera_id):
+    return query_single(f'''SELECT "Grupo_id" FROM "Grupo_view"
+        WHERE REGEXP_REPLACE("Grupo_desc", '[^\d]', '', 'g') = '{grupo}' AND (("Carrera_id" = {carrera_id}) OR "Carrera_esComun") 
+    ''')
+def insert_materia(clave, materia_id, periodo_id, grupo_id):
+    execute_query(f'''INSERT INTO public."Alumno_Materia"("Usuario_claveULSA", "Materia_id", "Periodo_id", "Grupo_id")
+        VALUES ({int(clave[2:])}, {materia_id}, {periodo_id}, {grupo_id})
+        ON CONFLICT ("Usuario_claveULSA", "Materia_id", "Periodo_id") DO NOTHING;
+    ''')
+def insert_calificaciones(calificaciones):
+    execute_query(f'''
+        insert into "Alumno_Materia_Calificacion"
+        ("Usuario_claveULSA", "Materia_id", "Periodo_id", "TipoCalificacion_id", "Calificacion_calif", "Calificacion_fecha", "Calificacion_comentario")
+        values {','.join(calificaciones)}
+        on conflict ("Usuario_claveULSA", "Materia_id", "Periodo_id", "TipoCalificacion_id")
+        DO UPDATE SET "Calificacion_calif" = EXCLUDED."Calificacion_calif", "Calificacion_comentario" = EXCLUDED."Calificacion_comentario";
+        ''')
+
+
+#alumno structure {apellido_paterno: str, apellido_materno: str, curp: str, clave_carrera: int, plan: int, clave: int, servicio_social: bool, nombre: str, correo: str}
+def insert_alumno(alumno):
+    alumno_base = query_single(f'SELECT "Carrera_id", "PlanEstudio_id" FROM "Alumno_view" WHERE "Usuario_claveULSA" = {alumno["clave"]}')
+    if alumno_base:
+        return alumno_base
+    
+    usuario_base = query_single(f"""SELECT * FROM "Usuario" WHERE "Usuario_curp" = '{alumno["curp"]}'""")
+    plan_estudio_base = query_single(f"""SELECT * FROM "PlanEstudio_view" WHERE "PlanEstudio_desc" LIKE '%{alumno["plan"]}' AND "Carrera_clave" = {alumno["clave_carrera"]}""")
+    
+    if usuario_base:
+        execute_query(f'''
+            INSERT INTO public."Alumno"("Usuario_claveULSA", "Usuario_id", "PlanEstudio_id", "Alumno_fecha_ingreso", "Alumno_generacion", "Alumno_serviciosocial")
+            VALUES ({alumno["clave"]}, {usuario_base["Usuario_id"]}, {plan_estudio_base["PlanEstudio_id"]}, '{alumno['fecha_ingreso']}', '{alumno['fecha_ingreso']}', {alumno["servicio_social"]})
+            ON CONFLICT ("Usuario_claveULSA") DO NOTHING;
+        ''')
+        execute_query(f'''
+            INSERT INTO "Alumno_SubEstadoAlumno" ("SubEstadoAlumno_id", "Usuario_claveULSA", "SEA_fecha", "SEA_actual")
+			VALUES (3, {alumno["clave"]},'{alumno['fecha_ingreso']}', true)
+        ''')
+        
+        alumno_base = query_single(f'SELECT "Carrera_id", "PlanEstudio_id" FROM "Alumno_view" WHERE "Usuario_claveULSA" = {alumno["clave"]}')
+        
+        return alumno_base
+    
+    usuario_base = query_single(f'''
+        INSERT INTO public."Usuario"("Usuario_nombre", "Usuario_apellidos", "Usuario_curp")
+        VALUES ('{alumno["nombre"]}', '{alumno["apellido_paterno"]} {alumno["apellido_materno"]}', '{alumno["curp"]}') RETURNING "Usuario_id"
+    ''')
+    execute_query(f'''
+        INSERT INTO public."Alumno"("Usuario_claveULSA", "Usuario_id", "PlanEstudio_id", "Alumno_fecha_ingreso", "Alumno_generacion", "Alumno_serviciosocial")
+        VALUES ({alumno["clave"]}, {usuario_base["Usuario_id"]}, {plan_estudio_base["PlanEstudio_id"]}, '{alumno['fecha_ingreso']}', '{alumno['fecha_ingreso']}', {alumno["servicio_social"]})
+        ON CONFLICT ("Usuario_claveULSA") DO NOTHING;
+    ''')
+    alumno_base = query_single(f'SELECT "Carrera_id", "PlanEstudio_id" FROM "Alumno_view" WHERE "Usuario_claveULSA" = {alumno["clave"]}')
+        
+    return alumno_base
+
+# insert actualmente_cursadas
+def insert_actualmente_cursadas(clave, actualmente_cursadas, periodo_actual, grupo_actual):
+    grupo_base = query_single(f'SELECT "Grupo_id" FROM "Grupo" WHERE "Grupo_desc" = {grupo_actual}')
+    periodo_base = query_single(f'SELECT "Periodo_id" FROM "Periodo" WHERE "Periodo_shortname" = {periodo_actual}')
+    # only where materia["Clave"] is set
+    for materia in filter(lambda materia: "Clave" in materia, actualmente_cursadas):
+        # to varchar
+        materia_clave = f"'{materia['Clave']}'"
+        materia_base = query_single(f'SELECT "Materia_id" FROM "Materia" WHERE  {materia_clave} = ANY("Materia_claves")')
+        if materia_base:
+            execute_query(f'''
+                INSERT INTO public."Alumno_Materia"("Usuario_claveULSA", "Materia_id", "Periodo_id", "Grupo_id")
+                VALUES ({clave[2:]}, {materia_base["Materia_id"]}, {periodo_base["Periodo_id"]}, {grupo_base["Grupo_id"]})
+                ON CONFLICT ("Usuario_claveULSA", "Materia_id", "Periodo_id") DO NOTHING;
+            ''')
+
+
+def insert_datos(alumno):
+    execute_query(f'''
+        INSERT INTO public."pos_ultima_extraccion"("Usuario_claveULSA", "telefono", "correo", "estatus", "promedio", "sexo", "semestre")
+        VALUES ({alumno["clave"]}, '{alumno["telefono"]}', '{alumno["correo"]}', '{alumno["estatus"]}', {alumno["promedio"]} , '{alumno["sexo"]}', {alumno["semestre"]})
+        ON CONFLICT ("Usuario_claveULSA") DO UPDATE SET "telefono" = EXCLUDED."telefono", "correo" = EXCLUDED."correo", "estatus" = EXCLUDED."estatus", "promedio" = EXCLUDED."promedio",
+        "actualizacion" = now();
+    ''')
+    
+    

+ 35 - 0
lib/log.py

@@ -0,0 +1,35 @@
+import psycopg2
+from psycopg2.extras import DictCursor
+
+def connect_to_database(dbname="sgi", user="postgres", password="sys4lci", host="200.13.89.27", port="5432"):
+    """
+    Establece una conexión a la base de datos y la retorna.
+    """
+    connection = psycopg2.connect(
+        dbname=dbname,
+        user=user,
+        password=password,
+        host=host,
+        port=port
+    )
+    return connection
+
+
+def query_all(sql):
+    with connect_to_database() as conn:
+        with conn.cursor(cursor_factory=DictCursor) as cur:
+            cur.execute(sql)
+            return cur.fetchall()
+
+def query_single(sql):
+    with connect_to_database() as conn:
+        with conn.cursor(cursor_factory=DictCursor) as cur:
+            cur.execute(sql)
+            return cur.fetchone()  # Returns a dictionary-like object
+
+def execute_query(sql):
+    with connect_to_database() as conn:
+        with conn.cursor() as cur:
+            cur.execute(sql)
+            conn.commit()  # Commit to save the insert operation
+

+ 16 - 0
lib/selenium_setup.py

@@ -0,0 +1,16 @@
+from selenium import webdriver
+from selenium.webdriver.chrome.service import Service
+from selenium.webdriver.chrome.options import Options
+
+def configure_selenium():
+    options = Options()
+    options.add_argument("--headless")
+    options.add_argument("--disable-gpu")
+    options.add_argument("--no-sandbox")
+    options.add_argument("--disable-dev-shm-usage")
+    options.add_argument("--window-size=1920x1080")
+
+    PATH = "/usr/bin/chromedriver"
+    service = Service(PATH)
+    driver = webdriver.Chrome(service=service, options=options)
+    return driver

+ 54 - 0
lib/web_navigation.py

@@ -0,0 +1,54 @@
+from selenium.webdriver.common.by import By
+from selenium.webdriver.support.ui import WebDriverWait
+from selenium.webdriver.support import expected_conditions as EC
+
+def navigate_to_url(driver, url, clave, contraseña):
+    formatted_url = f"https://{clave}:{contraseña}@{url}"
+    driver.get(formatted_url)
+    driver.get(f'https://{url}')
+    # If dentro del código no existe un elemento con el id ctl00_lnkHome
+    if not WebDriverWait(driver, 10).until(
+        EC.presence_of_element_located((By.ID, "ctl00_lnkHome"))
+    ):
+        raise Exception("No se pudo iniciar sesión.")
+    
+    # wait until it appears this element.id = ctl00_contenedor_HistorialAlumno1_lblApPatAlumnoHP and get it
+    
+    def wait_for_element(driver, element_id):
+        return WebDriverWait(driver, 10).until(
+            EC.presence_of_element_located((By.ID, element_id))
+        )
+    
+    servicio_social = wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblSS")
+    alumno = {
+        "apellido_paterno": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblApPatAlumnoHP").text,
+        "apellido_materno": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblApMatAlumnoHP").text,
+        "curp": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblCURPAlumnoHP").text,
+        "clave_carrera": int(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblCveCarrera").text),
+        "plan": int(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblAlupla").text),
+        "clave": int(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblCveUlsa").text),
+        "servicio_social": servicio_social.text == "Realizado",
+        "nombre":wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblNombreAlumnoHP").text,
+        "correo": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblCorreoAlumnoHP").text,
+        'estatus' : wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblStat").text,
+        "telefono": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblTelefonoAlumnoHP").text,
+        "semestre": int(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblSem").text),
+        "sexo": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblSexoAlumnoHP").text,
+    }
+    
+    
+    click_element(driver, element_id="ctl00_contenedor_HistorialAlumno1_lblBtnSeccionHAcademico")
+    
+    historial_academico = driver.page_source
+    
+    alumno['promedio'] = float(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblPromedioAlumnoHA").text)
+
+    return alumno, historial_academico
+
+def click_element(driver, element_id):
+    elemento = WebDriverWait(driver, 10).until(
+        EC.presence_of_element_located((By.ID, element_id))
+    )
+    elemento.click()
+
+

+ 16 - 0
logs.py

@@ -0,0 +1,16 @@
+import psycopg2
+from psycopg2.extras import DictCursor
+
+conn = psycopg2.connect(
+    dbname='adcfi',
+    user='postgres',
+    password='Ultr4p0d3r0s0##',
+    host='200.13.89.42',
+    port='5432'
+)
+
+with conn.cursor(cursor_factory=DictCursor) as cur:
+    cur.execute("SELECT * FROM calificaciones.calificaciones_log ORDER BY log_timestamp DESC LIMIT 10")
+
+    for row in cur:
+        print(row)

+ 105 - 0
main.py

@@ -0,0 +1,105 @@
+import sys
+from pathlib import Path
+from flask import Flask, request, jsonify
+from waitress import serve
+
+app = Flask(__name__)
+
+# Agregar el directorio lib al path de Python
+lib_path = Path(__file__).parent / 'lib'
+sys.path.append(str(lib_path))
+
+# Ahora puedes importar los módulos desde el directorio lib
+from selenium_setup import configure_selenium
+from argument_parser import parse_arguments
+from web_navigation import navigate_to_url, click_element
+from data_processing import process_html
+from database_operations import log
+from funciones import *
+
+def main(clave, contraseña):
+    # Configurar Selenium y navegar a la URL
+    driver = configure_selenium()
+    url = "sgu.ulsa.edu.mx/psulsa/alumnos/consultainformacionalumnos/consultainformacion.aspx"
+    datos_alumno, historial_academico = navigate_to_url(driver, url, clave, contraseña)
+    materias_sgu, actualmente_cursadas, periodo_actual, grupo_actual = process_html(historial_academico)
+    insert_actualmente_cursadas(clave, actualmente_cursadas, periodo_actual, grupo_actual)
+
+    # Obtener datos de la base
+    periodos_base = get_periodos(materias_sgu)
+    
+    # obtener la fecha mínima del arreglo de diccionarios de periodos en su campo "Periodo_fecha_inicio"
+    fecha_mínima = min(periodos_base, key=lambda x: x['Periodo_fecha_inicial'])['Periodo_fecha_inicial']
+    # déjalo en el primer día del mes
+    datos_alumno['fecha_ingreso'] = fecha_mínima.replace(day=1)
+    
+    materias_base = get_materias(materias_sgu)
+    tipo_calificaciones_base = get_tipo_calificaciones(materias_sgu)
+    alumno_base = insert_alumno(datos_alumno)
+    
+    # Actualizar servicio social
+    actualizar_servicio(clave, datos_alumno['servicio_social'])
+    
+    #insert datos
+    insert_datos(datos_alumno)
+    
+    calificaciones = []
+    no_insertadas = []
+    
+    for materia_sgu in materias_sgu:
+        materia_base = next((materia_base for materia_base in materias_base if materia_sgu['Cve ULSA'] in materia_base['Materia_claves'] or (materia_sgu['Cve SEP'] in materia_base["Materia_claves"] and alumno_base['PlanEstudio_id'] == materia_base['PlanEstudio_id'])), None)
+        periodo_base = next((periodo_base for periodo_base in periodos_base if periodo_base['Periodo_shortname'] == materia_sgu['PERIODO']), None)
+        tipo_calificacion_base = next((calificacion_base for calificacion_base in tipo_calificaciones_base if calificacion_base['TipoCalificacion_desc_corta'] == materia_sgu['EXAMEN']), None)
+        if 'GRUPO' in materia_sgu.keys():
+            grupo = get_grupo(materia_sgu['GRUPO'], alumno_base['Carrera_id'])
+        else:
+            grupo = None
+            
+        if materia_base and periodo_base and tipo_calificacion_base:
+            calificaciones.append(f"({clave[2:]}, {materia_base['Materia_id']}, {periodo_base['Periodo_id']}, {tipo_calificacion_base['TipoCalificacion_id']}, {materia_sgu['CALIF']}, CURRENT_DATE, 'SGU')")
+
+        if not materia_base or not periodo_base or not tipo_calificacion_base or not grupo:
+            no_insertadas.append(f'''Materia: {materia_base['Materia_id'] if materia_base else materia_sgu['Cve ULSA']} - Periodo_base: {periodo_base['Periodo_id'] if periodo_base else materia_sgu['PERIODO']} Tipo_calificacion_base: {tipo_calificacion_base['TipoCalificacion_id'] if tipo_calificacion_base else materia_sgu['EXAMEN']} Grupo: {grupo['Grupo_id'] if grupo else materia_sgu['GRUPO'] if 'GRUPO' in materia_sgu.keys() else 'None' } ''')
+            continue
+        
+        insert_materia(clave, materia_base['Materia_id'], periodo_base['Periodo_id'], grupo['Grupo_id'])
+        
+
+    if not calificaciones or len(calificaciones) == 0:
+        raise Exception("No hay calificaciones para insertar o actualizar.")
+    
+    # Insertar calificaciones
+    insert_calificaciones(calificaciones)
+    return clave, no_insertadas
+
+@app.route('/calificaciones', methods=['POST'])
+def calificaciones():
+    try:
+        # Obtener la clave y la contraseña de la solicitud POST
+        clave = request.form.get('clave')
+        contraseña = request.form.get('contraseña')
+
+        # Verificar si la clave y la contraseña existen
+        if clave is None or contraseña is None:
+            return "Error: La clave y/o contraseña no fueron proporcionadas en la solicitud."
+
+        # Procesar los datos (aquí llamamos a la función main)
+        clave, no_insertadas = main(clave, contraseña)
+
+        # Registro de éxito
+        log("Proceso terminado con éxito.", 200, None, clave, no_insertadas)
+
+        # Retornar respuesta exitosa como JSON
+        return jsonify({"mensaje": "Proceso terminado con éxito.", "clave": clave, "no_insertadas": no_insertadas, "success": True})
+
+    except Exception as e:
+        # remove all ' from the error message
+        e = e.replace("'", "")
+        log(str(e), 500, e, None)
+
+        # Retornar mensaje de error como JSON
+        return jsonify({"mensaje": str(e), "success": False})
+        
+
+if __name__ == "__main__":
+    serve(app, host='0.0.0.0', port=5000)

二進制
proceso


+ 217 - 0
sgu.py

@@ -0,0 +1,217 @@
+import numpy as np
+from bs4 import BeautifulSoup
+import psycopg2
+from selenium import webdriver
+from selenium.webdriver.chrome.service import Service
+from selenium.webdriver.chrome.options import Options
+from selenium.webdriver.common.by import By
+from selenium.webdriver.support.ui import WebDriverWait
+from selenium.webdriver.support import expected_conditions as EC
+import argparse
+import getpass
+
+try:
+    # Configuración de Selenium
+    options = Options()
+    options.add_argument("--headless")
+    options.add_argument("--disable-gpu")
+    options.add_argument("--no-sandbox")
+    options.add_argument("--disable-dev-shm-usage")
+    options.add_argument("--window-size=1920x1080")
+    
+    PATH = "/usr/bin/chromedriver"
+    service = Service(PATH)
+    driver = webdriver.Chrome(service=service, options=options)
+except Exception as e:
+    print(f"Error configurando el navegador: {e}")
+    exit()
+
+try:
+    # Parseo de argumentos
+    parser = argparse.ArgumentParser()
+    parser.add_argument("clave", help="Clave ULSA argument")
+    args = parser.parse_args()
+    
+    clave = args.clave
+    # Solicitar la contraseña de manera segura
+    contraseña = getpass.getpass("Contraseña: ")
+    
+    if not clave or not contraseña:
+        raise ValueError("Clave y/o contraseña no válidos")
+except Exception as e:
+    print(f"Error en los argumentos: {e}")
+    driver.quit()
+    exit()
+
+
+try:
+    # Navegar a la URL
+    url = "sgu.ulsa.edu.mx/psulsa/alumnos/consultainformacionalumnos/consultainformacion.aspx"
+    formatted_url = f"https://{clave}:{contraseña}@{url}"
+    driver.get(formatted_url)
+    driver.get(f'https://{url}')
+except Exception as e:
+    print(f"Error navegando a la URL: {e}")
+    driver.quit()
+    exit()
+
+try:
+    # If dentro del código existe un (case insensitive) Unauthorized
+    if "Unauthorized" in driver.page_source:
+        raise Exception("Credenciales inválidas")
+
+    elemento = WebDriverWait(driver, 10).until(
+        EC.presence_of_element_located((By.ID, "ctl00_contenedor_HistorialAlumno1_lblBtnSeccionHAcademico"))
+    )
+    elemento.click()
+except Exception as e:
+    print(f"Error interactuando con la página: {e}")
+    driver.quit()
+    exit()
+
+try:
+    # Procesamiento de HTML con BeautifulSoup
+    html_doc = driver.page_source
+    soup = BeautifulSoup(html_doc, 'lxml')
+    table = soup.find('table', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_gvMaterias'})
+    
+    if table is None:
+        raise Exception("Tabla no encontrada en la página")
+    
+    materias_sgu = []
+
+    headers = [header.text for header in table.find_all('th')]
+
+    def is_cell_empty(cell_content):
+        return not cell_content.strip() or cell_content == u'\xa0'
+
+    for row in table.find_all('tr'):
+        cols = row.find_all('td')
+        if cols and not any(is_cell_empty(col.text) for col in cols):
+            materias_sgu.append({headers[i]: col.text for i, col in enumerate(cols)})
+    
+    for materia in materias_sgu:
+        materia['SEMESTRE'] = materia.pop('\xa0')
+
+
+    servicio_social = soup.find('span', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_Header1_lblSS'}).text
+    # puede ser Realizado, No Realizado
+    if servicio_social == "No Realizado":
+        Alumno_serviciosocial = False
+    elif servicio_social == "Realizado":
+        Alumno_serviciosocial = True
+except Exception as e:
+    print(f"Error procesando HTML: {e}")
+    exit()
+finally:
+    driver.quit()
+
+# Conexión a la base de datos y operaciones
+conexion = psycopg2.connect(
+    dbname="sgi",
+    user="postgres",
+    password="sys4lci",
+    host="200.13.89.27",
+    port="5432"
+)
+
+# Ejecutar la consulta para actualizar public."Alumno"."Alumno_serviciosocial"
+try:
+    with conexion.cursor() as cursor:
+        cursor.execute(f'UPDATE public."Alumno" SET "Alumno_serviciosocial" = {Alumno_serviciosocial} WHERE "Usuario_claveULSA" = \'{clave[2:]}\'')
+    conexion.commit()
+except Exception as e:
+    print(f"Error al actualizar el servicio social: {e}")
+    if conexion:
+        conexion.rollback()
+    
+try:
+    # Ejecutar la consulta para obtener los periodos
+    with conexion.cursor() as cursor:
+        cursor.execute('SELECT * FROM "Periodo"')
+        Periodos = cursor.fetchall()
+except Exception as e:
+    print(f"Error obteniendo los periodos de la base de datos: {e}")
+
+try:
+    # Ejecutar la consulta para obtener los tipos de calificación
+    with conexion.cursor() as cursor:
+        cursor.execute('SELECT * FROM "TipoCalificacion"')
+        TiposCalificacion = cursor.fetchall()
+except Exception as e:
+    print(f"Error obteniendo los tipos de calificación de la base de datos: {e}")
+
+try:
+    # Ejecutar la consulta para obtener las materias base
+    with conexion.cursor() as cursor:
+        cursor.execute('SELECT * FROM "Materia"')
+        materias_base = cursor.fetchall()
+except Exception as e:
+    print(f"Error obteniendo las materias de la base de datos: {e}")
+try:
+    # Procesar y preparar las calificaciones para inserción/actualización
+    calificaciones = []
+    with conexion.cursor() as cursor:
+        cursor.execute(f'''
+        SELECT "Carrera_id", "PlanEstudio_id"
+        FROM "Alumno_view"
+        WHERE "Usuario_claveULSA" = {clave[2:]}
+        ''')
+        Alumno_base = cursor.fetchone()
+    for materia_sgu in materias_sgu:
+        materia_base = next((materia_base for materia_base in materias_base if (materia_sgu['Cve ULSA'] in materia_base[-1] or materia_sgu['Cve SEP'] in materia_base[-1]) and (Alumno_base[1] == materia_base[0])), None)
+        if not materia_base:
+            continue
+
+        Periodo_base = next((Periodo_base for Periodo_base in Periodos if Periodo_base[-2] == materia_sgu['PERIODO']), None)
+        if not Periodo_base:
+            continue
+
+        Calificacion_base = next((Calificacion_base for Calificacion_base in TiposCalificacion if Calificacion_base[-2] == materia_sgu['EXAMEN']), None)
+        if not Calificacion_base:
+            raise Exception(f"No se encontró el tipo de calificación {materia_sgu['EXAMEN']} en la base de datos")
+        
+        # buscar en la base de datos el grupo WHERE Grupo_desc = materia_sgu['GRUPO'] and if is not in the base insert it (note: semestre when printed in the console is '\xa0': '5')
+        with conexion.cursor() as cursor:
+            cursor.execute(f'''
+            SELECT "Grupo_id"
+            FROM "Grupo_view"
+            WHERE REGEXP_REPLACE("Grupo_desc", '[^\d]', '', 'g') = '{materia_sgu["GRUPO"]}' AND (("Carrera_id" = {Alumno_base[0]}) OR "Carrera_esComun") 
+            ''')
+            Grupo = cursor.fetchone()
+            if Grupo:
+                cursor.execute(f'''
+                INSERT INTO public."Alumno_Materia"("Usuario_claveULSA", "Materia_id", "Periodo_id", "Grupo_id")
+                VALUES ({clave[2:]}, {materia_base[0]}, {Periodo_base[0]}, {Grupo[0]})
+                ON CONFLICT ("Usuario_claveULSA", "Materia_id", "Periodo_id") DO NOTHING
+                ;
+                ''')
+
+            conexion.commit()
+
+        if not materia_base or not Periodo_base or not Calificacion_base:
+            print(f"No se encontraron coincidencias para la materia {materia_sgu['Cve ULSA']} o el periodo {materia_sgu['PERIODO']} o el tipo de calificación {materia_sgu['EXAMEN']}")
+            continue  # Saltar esta iteración si alguna coincidencia falla
+        calificaciones.append(f"({clave[2:]}, {materia_base[0]}, {Periodo_base[0]}, {Calificacion_base[0]}, {materia_sgu['CALIF']}, CURRENT_DATE, 'SGU')")
+    
+    if not calificaciones or len(calificaciones) == 0:
+        raise Exception("No hay calificaciones para insertar o actualizar.")
+    
+    # Inserción/actualización de calificaciones en la base de datos
+    with conexion.cursor() as cursor:
+        cursor.execute(f'''
+        insert into "Alumno_Materia_Calificacion"
+        ("Usuario_claveULSA", "Materia_id", "Periodo_id", "TipoCalificacion_id", "Calificacion_calif", "Calificacion_fecha", "Calificacion_comentario")
+        values
+        {','.join(calificaciones)}
+        on conflict ("Usuario_claveULSA", "Materia_id", "Periodo_id", "TipoCalificacion_id")
+        DO UPDATE SET "Calificacion_calif" = EXCLUDED."Calificacion_calif", "Calificacion_comentario" = EXCLUDED."Calificacion_comentario";
+        ''')
+    conexion.commit()
+except Exception as e:
+    print(f"Error al insertar/actualizar las calificaciones: {e} Stack: {e.__traceback__}")
+    # print the whole stack
+    if conexion:
+        conexion.rollback()  # Revertir cambios en caso de error
+
+conexion.close()

Some files were not shown because too many files changed in this diff