Cuando pase el temblor: notificaciones de Jenkins a la carta

En 2024 se cumplirán 10 años de la muerte de Gustavo Cerati, reconocido por propios y extraños como una de las figuras más relevantes del rock en español. Si bien vivió gran parte de su carrera como solista, fue en 1985, mientras integraba Soda Stereo, que publicó el tema “Cuando pase el temblor“:

Soda Stereo se inmortaliza en la historia con su tema “Cuando pase el temblor” del disco “Nada personal”, publicado en 1985.

Lejos de las interpretaciones de los siempre crípticos temas de Soda Stereo, me centraré en el problema que plantea la literalidad de la canción: la necesidad de recibir una señal tras la ocurrencia de un suceso y la forma en la que esto puede resolverse usando tecnología (Jenkins y algunas otras cosas en nuestro caso).

Cuando pase el temblor: definiendo requerimientos

Bien, vamos considerar cuáles son los posibles requerimientos que derivan de “Cuando pase el temblor“:
– El protagonista pide ser despertado. Para nuestros efectos, diremos que desea ser notificado de algo.
– El protagonista quiere ser despertado solamente después de que haya ocurrido un temblor.

Lo anterior nos permite hacer varias consideraciones y delimitar el alcance de la solución que potencialmente podríamos ofrecer.

Despiertame…

Digitalmente hablando, podemos notificar a alguien a traves de correo, un SMS, o una notificación mediante alguna app como WhatsApp o Telegram. Considerando que el correo electrónico es posiblemente uno de los medios más extendidos de comunicación digital, procuraremos despertar al protagonista mediante una comunicación de este tipo.

…cuando pase el temblor.

En relación con la ocurrencia de los temblores, primero debemos realizar una delimitación geográfica pues los eventos sísmicos son comunes y abundantes en todo el goblo terráqueo. Apegados a la literalidad de la letra, el estribillo dice: “despiertame… cuando pase el temblor”. El protagonista pide a alguien más, un tercero, que le avise cuando ya el temblor pasó. Naturalmente, este tercero solamente puede avisar al protagonista si percibió el temblor en primer término. Como la notificación la realizaré yo, es lógico que solamente debería avisar después de que percibí un temblor y éste ya ha cesado. De este modo, limitaré las notificaciones a los temblores ocurridos en mi ubicación geográfica actual: Costa Rica. Como veremos en breve, el OVSICORI mantiene información actualizada acerca de los sismos sentidos por la población.

Obteniendo información sísmica en Costa Rica

El Observatorio Vulcanológico y Sismológico de Costa Rica (OVSICORI) es el encargado de la “vigilancia sísmica y volcánica” de Costa Rica y se encuentra adscrito a la Universidad Nacional de Costa Rica. En su página web ofrece información de toda clase, incluyendo un apartado de “Sismología” que incluye información sobre “Sismos sentidos“. La página web no ofrece una definición de lo que se entiende por sismos sentidos, pero asumiremos que se trata de aquellos que la propia población reporta a través de los diferentes canales.

No todos los sismos adquiren el estatus de “sentidos“, como puede apreciarse fácilmente al compararlos con la información que figura en la tabla de “Sismos recientes“. Para nuestros efectos, utilizaremos solamente la información que corresponde a la tabla de sismos sentidos e ignoraremos las demás tablas disponibles. Lamentablemente, no encontré una API que me permitirera consultar la información, de modo que tendremos que hacer algo de “web scrapping” para obtener lo que necesitamos.

En resumen

Considerando lo anterior, haremos lo siguiente:
– Utilizando Python y Jenkins, consultaremos periodicamente la tabla “Sismos Sentidos” del OVSICORI y almacenaremos la información del sismo más reciente. Luego, cada vez que verifiquemos la tabla compararemos la información más reciente del OVSICORI con la informacion almacenada previamente. Si ambas fuentes difieren, sabremos que hubo un sismo nuevo y enviaremos una notificación por correo electrónico. Por el contrario, si ambas fuentes son iguales, no notificaremos nada.

Diagrama del proyecto

Jenkins nos permitirá unir los diferentes elementos y notificar cuando se requiera. La estructura del código luce así:

Código fuente:

git clone git@github.com:andreypicado506/cuando-pase-el-temblor.git

Scripts de Python

Primero repasaremos los scripts necesarios para obtener la información desde la página del OVSICORI. Dicha información no proviene de un API, así que es preciso obtenerla del elemento tabla disponible. La primera fila corresponde al encabezado y debe ignorarse, luego de esto se encuentra siempre el sismo más reciente. La tabla puede obtenerse utilizando el módulo requests y la información puede filtrarse mediante los tags del tipo de elemento HTML usando una libería como bs4. He intentando comentar el código ampliamente y he utilizado type hints para cada caso:

#!/usr/bin/env python3

import argparse
import requests
from bs4 import BeautifulSoup
from typing import Union, List, Dict, Optional

def get_seismic_data(url: str) -> Union[requests.Response, None]:
    """
    Fetch seismic data from a specified URL using an HTTP GET request.

    :param url: The URL to fetch seismic data from.
    """
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response
        else:
            print(f"API request failed with status code: {response.status_code}")
            return None
    except Exception as e:
        print(f"Error while making API request: {str(e)}")
        return None

def parse_seismic_data(html_content: requests.Response, header_tag: str ='tr', amount_of_rows: int =1) -> List[Dict[str, str]]:
    """
    Parse seismic data from an HTML table and return a list of dictionaries.

    :param html_content: HTML content as a BeautifulSoup object.
    :param header_tag: The HTML tag to match for rows (default is 'tr').
    :param amount_of_rows: The number of rows to parse (default is 1).
    """
    seisms = []
    soup = BeautifulSoup(html_content.text, 'html.parser')
    # Find the first table row (excluding the header row)
    rows = soup.find_all(header_tag, class_='header')[0].find_next(header_tag)
    for row in range(amount_of_rows):
        target_row = rows.contents[row].find_all('td')
        seism = {
            'date': target_row[0].text.strip(),
            'time': target_row[1].text.strip(),
            'magnitude': target_row[2].text.strip()
        }
        seisms.append(seism)
    return seisms

def main() -> List[Dict[str, str]]:
    parser = argparse.ArgumentParser(description="Script that get data from the last earthquake in Costa Rica.")
    parser.add_argument("-u", "--ovsicori-url", type=str, required=True, help="'Sismos Sentidos' table from the OVSICORI website.")

    ovsicori_url = parser.parse_args().ovsicori_url

    html_content = get_seismic_data(ovsicori_url)
    seismic_data = parse_seismic_data(html_content)
    
    return seismic_data

if __name__== "__main__":
    main()

El siguiente script se centra en operaciones con S3. Primero, cada vez que obtiene información sobre un sismo nuevo, debe verificar si el archivo para almacenar esa información existe en el bucket de S3. Si no existe (como ocurrirá durante la primera ejecución) deberá crearlo. Si existe, debe leerlo y compararlo con la información más reciente proveniente del OVSICORI. Si la información del OVSICORI es más reciente que la existente en el archivo en S3, deberá actualizarlo. Las operaciones con AWS las he realizado usando boto3. La autenticación con AWS se maneja de forma segura desde Jenkins mediante una conjunción del Credential Manager y el plugin “Pipeline: AWS Steps“, aunque es posible crear una sesión y utilizar credenciales como parámetros cuando se llama al script.

#!/usr/bin/env python3

import argparse
import boto3
from typing import NoReturn, Union

def s3_file_exist(bucket_name, file_key) -> bool:
    """
    Check if a file exists in an S3 bucket.

    :param bucket_name: The name of the S3 bucket.
    :param file_key: The key (path) of the file within the bucket.
    """
    s3 = boto3.client('s3')
    try:
        s3.head_object(Bucket=bucket_name, Key=file_key)
        return True
    except Exception as e:
        return False

def create_s3_file(bucket_name, file_key, content) -> NoReturn:
    """
    Create a new file in an S3 bucket with the provided content.

    :param bucket_name: The name of the S3 bucket.
    :param file_key: The key (path) of the new file within the bucket.
    :param content: The content to be written to the file.
    """
    s3 = boto3.client('s3')
    try:
        s3.put_object(Bucket=bucket_name, Key=file_key, Body=content.encode('utf-8'))
    except Exception as e:
        print(f"Error creating S3 file: {e}")

def read_s3_file(bucket_name, file_key) -> Union[str, None]:
    """
    Read the content of a file from an S3 bucket.

    :param bucket_name: The name of the S3 bucket.
    :param file_key: The key (path) of the file within the bucket.
    :return: The content of the file as a UTF-8 encoded string, or None if there was an error.
    """
    s3 = boto3.client('s3')
    try:
        response = s3.get_object(Bucket=bucket_name, Key=file_key)
        content = response['Body'].read().decode('utf-8')
        return content
    except Exception as e:
        return None

def update_s3_file(bucket_name, file_key, new_content) -> NoReturn:
    """
    Update the content of an existing file in an S3 bucket with new content.

    :param bucket_name: The name of the S3 bucket.
    :param file_key: The key (path) of the file to be updated within the bucket.
    :param new_content: The new content to replace the existing content of the file.
    """
    s3 = boto3.client('s3')
    try:
        s3.put_object(Bucket=bucket_name, Key=file_key, Body=new_content.encode('utf-8'))
    except Exception as e:
        print(f"Error updating S3 file: {e}")

def main() -> bool:
    # Parser for args
    parser = argparse.ArgumentParser(description="Script to check, read and write a file in AWS S3.")
    parser.add_argument("-b", "--bucket-name", type=str, required=True)
    parser.add_argument("-f", "--file-key", type=str, required=True)
    parser.add_argument("-l", "--local-file", required=True)

    # Main vars
    bucket_name          = parser.parse_args().bucket_name
    file_key             = parser.parse_args().file_key
    local_file_path      = parser.parse_args().local_file

    # Read the file that was previously created in the pipeline
    # it will have the latest data from OVSICORI
    with open(local_file_path, 'r') as file:
        last_earthquake_data = file.read()

    # Check if the file exists in the s3 bucket, if not, create it
    s3_file_created = s3_file_exist(bucket_name, file_key)
    if not s3_file_created:
        create_s3_file(bucket_name, file_key, last_earthquake_data)
        return False
    
    # Compare current data from OVSICORI 
    # with the info stored in the s3 file
    current_content = read_s3_file(bucket_name, file_key)
    if current_content != last_earthquake_data:
        update_s3_file(bucket_name, file_key, last_earthquake_data)
        return True
    else:
        return False

if __name__ == "__main__":
    main()

Jenkinsfile

El pipeline de Jenkins configura automáticamente el job mediante la opción Pipeline > Definition > Pipeline script from SCM, de forma tal que todas las propiedades del pipeline se encuentran en código, incluyendo disparadores (triggers), parámetros, agentes y notificaciones. No obstante, es precisa alguna configuración en Jenkins, lo cual se tratará más adelante.


def lastEarthquakeData
def ceratiShouldBeWakeUp

pipeline {
    agent {
        docker {
            image 'andreypicado506/custom-python3:cerati'
        }
    }
    triggers {
        cron('H/30 * * * *')
    }
    parameters {
        string(
        name:'OVSICORI_URL',
        defaultValue:'http://www.ovsicori.una.ac.cr/sistemas/sentidos_map/index.php?tipo=center',
        description: 'URL de la tabla <Sismos Sentidos> del sitio web del OVSICORI.')
        string(
        name: 'S3_BUCKET_NAME',
        defaultValue: 'cerati-bucket',
        description: 'S3 bucket en donde está el archivo con los datos sísmicos.')
        string(
        name: 'S3_FILE_NAME',
        defaultValue: 'sismos',
        description: 'Nombre del archivo que contiene los datos sísmicos (se creará si no existe)')
    }
    environment {
        GITHUB_REPO_URL = 'git@github.com:andreypicado506/cuando-pase-el-temblor.git'
    }

    stages {
        stage('Obtener código fuente de Github') {
            steps {
                // Checkout the GitHub repository
                script {
                    git branch: 'main', credentialsId: 'github_wake_up', url: "${GITHUB_REPO_URL}"
                }
            }
        }

        stage('Obtener actividad sísmica más reciente') {
            steps {
                    script {
                        lastEarthquakeData = sh(
                            script: "python3 scripts/get_seismic_data.py -u '${OVSICORI_URL}'",
                            returnStdout: true).trim()
                        sh "echo '${lastEarthquakeData}' > ${S3_FILE_NAME}"
                    }
            }
        }
        stage('Validar si Cerati debe ser despertado') {
            steps {
                withAWS(
                    credentials:'aws_main',
                    region:     'us-west-2'
                ) {
                    script {
                        ceratiShouldBeWakeUp = sh(
                        script: 'python3 scripts/check_s3.py -b ${S3_BUCKET_NAME} \
                        -f ${S3_FILE_NAME} -l ${S3_FILE_NAME}',
                        returnStdout: true).trim()
                    }
                }
            }
        }
    }

    post {
        success {
            script{
                if (ceratiShouldBeWakeUp == 'True') {
                    mail(
                        subject: '¡Despierta!',
                        body:    'Ya pasó el temblor.',
                        to:      'notifications@andreypicado.com'
                    )
                    echo '¡Cerati debe ser despertado!'
                }
                else {
                    echo 'Cerati no debe ser despertado.'
                }
            }
        }
        failure {
            // This block will be executed if the pipeline fails
            echo 'Algo salió mal.'
        }
    }
}

Contenedor de Docker custom

Para correr los scripts anteriores, cree una imagen personalizada de Docker (docker pull andreypicado506/custom-python3:cerati) que utiliza como base la imagen de Python 3.12 y simplemente instala las depedencias necesarias utilizando pip. Dicha imagen puede construirse con el dockerfile disponible en el código fuente.

Configuraciones necesarias en Jenkins

La notificación que el pipeline envía se realiza mediante Amazon Simple Email Service (SES), pero Jenkins ofrece soporte para cualquier servicio SMTP. La configuración dependerá del servicio en cada caso. Además de esto, se utilizó el plugin “Pipeline: AWS Steps” para utilizar de forma segura las credenciales previamente almacenadas en el Credential Manager. No obstante, dichas credenciales pueden ser utilizadas mediante los métodos comunes que ofrece Groovy (withCredentials, por ejemplo). Asimismo, deben incorporarse al Credential Manager las llaves respectivas en caso de que se utilice algún servicio como Github o Gitlab.

Para una instalación limpia de Jenkins con Docker, recomiendo utilizar esta guía.

El pipeline en acción

El pipeline corre cada 30 minutos. Cuando determina que no hay un sismo nuevo que reportar, simplemente lo indica en el output del job:

Por el contrario, cuando determina que se produjo un sismo envía un correo electrónico:

El email luce así:

| Theme: UPortfolio