🛡️ PASAIA LAB: Monitor de Ciberseguridad con Big Data e IA Descentralizada
En PASAIA LAB, hemos desarrollado un sistema de defensa cibernética de vanguardia que integra la potencia de Big Data con la inteligencia colectiva de redes descentralizadas como Bittensor (TAO) y la automatización de agentes autónomos en NEAR Protocol.
El Desafío: Ataques Cada Vez Más Sofisticados
Los ataques cibernéticos ya no son simples. Requieren una defensa que no solo reaccione a lo conocido, sino que aprenda, se adapte y actúe de forma autónoma. Aquí es donde nuestro "Escudo Pasaia 2026" marca la diferencia.
¿Cómo Funciona el Escudo de PASAIA LAB?
- Vigilancia Global (Bittensor - TAO): Antes de que un dato entre a nuestra red, se consulta a una vasta red de modelos de IA en Bittensor. Si la inteligencia colectiva global detecta patrones maliciosos (ej. phishing, malware de día cero), el tráfico es bloqueado preventivamente.
- Agentes Autónomos (NEAR Protocol): Si una anomalía es detectada internamente, un "Agente de Seguridad" autónomo desplegado en NEAR ejecuta un Smart Contract para, por ejemplo, congelar credenciales, aislar un dispositivo o desviar el tráfico sospechoso. Todo esto ocurre en milisegundos.
- Memoria y Aprendizaje (Big Data Interno): Todos los eventos, normales y anómalos, se registran en nuestro "Árbol de Datos". Esto no solo cumple con auditorías, sino que también sirve para re-entrenar nuestros modelos de IA, haciendo el sistema más robusto con cada incidente.
Monitor en Tiempo Real: La Sala de Control de PASAIA LAB
Para visualizar este proceso, hemos creado un monitor en Python que simula la detección de anomalías en el tráfico de red. Utiliza el algoritmo de Z-Score para identificar picos de actividad inusuales, que podrían indicar exfiltración de datos o un ataque.
import numpy as np
import time
import json
from datetime import datetime
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque
class PasaiaShield:
def __init__(self, threshold=3.0):
self.threshold = threshold
self.history = deque(maxlen=100)
self.audit_log_file = "audit_pasaia_lab.json"
# Para el monitor visual
self.x_data = deque(maxlen=50)
self.y_data = deque(maxlen=50)
self.z_scores = deque(maxlen=50)
self.alerts = deque(maxlen=50)
def ingest_traffic(self, packet_size):
self.history.append(packet_size)
def log_attack_to_json(self, packet_size, z_score):
attack_event = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"event_type": "ANOMALY_DETECTED",
"packet_size_kb": packet_size,
"severity_score": round(z_score, 2),
"protocol_action": "NEAR_SMART_CONTRACT_BLOCK",
"threat_intelligence": "TAO_SUBNET_REPORTED"
}
try:
try:
with open(self.audit_log_file, "r") as f:
data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
data = []
data.append(attack_event)
with open(self.audit_log_file, "w") as f:
json.dump(data, f, indent=4)
print(f"💾 Evento registrado en {self.audit_log_file}")
except Exception as e:
print(f"❌ Error al guardar auditoría: {e}")
def analyze_risk(self, current_packet, index):
if len(self.history) < 10:
return "ENTRENANDO...", 0, False
mean = np.mean(self.history)
std_dev = np.std(self.history)
z_score = abs(current_packet - mean) / std_dev if std_dev > 0 else 0
is_alert = False
if z_score > self.threshold:
self.log_attack_to_json(current_packet, z_score)
is_alert = True
self.x_data.append(index)
self.y_data.append(current_packet)
self.z_scores.append(z_score)
self.alerts.append(is_alert)
return "⚠️ ALERTA: ANOMALÍA DETECTADA" if is_alert else "✅ TRÁFICO NORMAL", z_score, is_alert
def animate(i, shield_instance, line_packet, line_zscore, ax1, ax2):
if i % 10 == 0 and i > 0:
packet = np.random.normal(5000, 100) if np.random.rand() < 0.2 else np.random.normal(500, 50)
else:
packet = np.random.normal(500, 50)
shield_instance.ingest_traffic(packet)
status, score, is_alert = shield_instance.analyze_risk(packet, i)
line_packet.set_data(list(shield_instance.x_data), list(shield_instance.y_data))
ax1.set_xlim(shield_instance.x_data[0], shield_instance.x_data[-1] + 1)
ax1.set_ylim(min(shield_instance.y_data) * 0.9, max(shield_instance.y_data) * 1.1)
line_zscore.set_data(list(shield_instance.x_data), list(shield_instance.z_scores))
ax2.set_xlim(shield_instance.x_data[0], shield_instance.x_data[-1] + 1)
ax2.set_ylim(0, max(max(shield_instance.z_scores) * 1.2, shield_instance.threshold * 1.5))
ax2.axhline(shield_instance.threshold, color='r', linestyle='--', label=f'Umbral Z-Score ({shield_instance.threshold})')
alert_x = [shield_instance.x_data[j] for j, alert in enumerate(shield_instance.alerts) if alert]
alert_y = [shield_instance.y_data[j] for j, alert in enumerate(shield_instance.alerts) if alert]
ax1.plot(alert_x, alert_y, 'ro', markersize=8, fillstyle='none')
ax1.set_title(f"PASAIA LAB: Monitor de Tráfico | {status}", color='red' if is_alert else 'green')
return line_packet, line_zscore,
if __name__ == "__main__":
escudo = PasaiaShield(threshold=3.0)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
fig.suptitle('PASAIA LAB: Escudo de Ciberseguridad IA', fontsize=16)
line_packet, = ax1.plot([], [], 'g-', label='Tamaño de Paquete (KB)')
ax1.set_ylabel('Tamaño de Paquete (KB)')
ax1.legend()
ax1.grid(True)
line_zscore, = ax2.plot([], [], 'b-', label='Z-Score')
ax2.set_xlabel('Tiempo (Iteraciones)')
ax2.set_ylabel('Z-Score')
ax2.legend()
ax2.grid(True)
ax2.axhline(escudo.threshold, color='r', linestyle='--', label=f'Umbral Z-Score ({escudo.threshold})')
ani = animation.FuncAnimation(fig, animate, fargs=(escudo, line_packet, line_zscore, ax1, ax2),
interval=100, blit=True, cache_frame_data=False)
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()
import numpy as np
import time
import json
from datetime import datetime
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque # Para almacenar datos de forma eficiente
class PasaiaShield:
def __init__(self, threshold=3.0):
self.threshold = threshold
self.history = deque(maxlen=100) # Usamos deque para eficiencia
self.audit_log_file = "audit_pasaia_lab.json"
# Para el monitor visual
self.x_data = deque(maxlen=50) # Tiempo o índices
self.y_data = deque(maxlen=50) # Tamaño de paquete
self.z_scores = deque(maxlen=50) # Z-Score calculado
self.alerts = deque(maxlen=50) # Marcar alertas
def ingest_traffic(self, packet_size):
"""Simula la entrada de datos al sistema"""
self.history.append(packet_size)
def log_attack_to_json(self, packet_size, z_score):
"""Guarda el ataque en la base de datos de auditoría"""
attack_event = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"event_type": "ANOMALY_DETECTED",
"packet_size_kb": packet_size,
"severity_score": round(z_score, 2),
"protocol_action": "NEAR_SMART_CONTRACT_BLOCK",
"threat_intelligence": "TAO_SUBNET_REPORTED"
}
try:
try:
with open(self.audit_log_file, "r") as f:
data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
data = []
data.append(attack_event)
with open(self.audit_log_file, "w") as f:
json.dump(data, f, indent=4)
print(f"💾 Evento registrado en {self.audit_log_file}")
except Exception as e:
print(f"❌ Error al guardar auditoría: {e}")
def analyze_risk(self, current_packet, index):
if len(self.history) < 10:
return "ENTRENANDO...", 0, False
mean = np.mean(self.history)
std_dev = np.std(self.history)
z_score = abs(current_packet - mean) / std_dev if std_dev > 0 else 0
is_alert = False
if z_score > self.threshold:
self.log_attack_to_json(current_packet, z_score)
is_alert = True
# Actualiza datos para el monitor
self.x_data.append(index)
self.y_data.append(current_packet)
self.z_scores.append(z_score)
self.alerts.append(is_alert)
return "⚠️ ALERTA: ANOMALÍA DETECTADA" if is_alert else "✅ TRÁFICO NORMAL", z_score, is_alert
# --- FUNCIÓN DE ACTUALIZACIÓN DEL MONITOR ---
def animate(i, shield_instance, line_packet, line_zscore, ax1, ax2):
# Genera un paquete de tráfico (simulación)
if i % 10 == 0 and i > 0: # Simula un ataque cada cierto tiempo
packet = np.random.normal(5000, 100) if np.random.rand() < 0.2 else np.random.normal(500, 50)
else:
packet = np.random.normal(500, 50) # Tráfico normal
shield_instance.ingest_traffic(packet)
status, score, is_alert = shield_instance.analyze_risk(packet, i)
# Actualiza el gráfico de tamaño de paquete
line_packet.set_data(list(shield_instance.x_data), list(shield_instance.y_data))
ax1.set_xlim(shield_instance.x_data[0], shield_instance.x_data[-1] + 1)
ax1.set_ylim(min(shield_instance.y_data) * 0.9, max(shield_instance.y_data) * 1.1)
# Actualiza el gráfico de Z-Score
line_zscore.set_data(list(shield_instance.x_data), list(shield_instance.z_scores))
ax2.set_xlim(shield_instance.x_data[0], shield_instance.x_data[-1] + 1)
ax2.set_ylim(0, max(max(shield_instance.z_scores) * 1.2, shield_instance.threshold * 1.5))
ax2.axhline(shield_instance.threshold, color='r', linestyle='--', label=f'Umbral Z-Score ({shield_instance.threshold})')
# Marcar alertas
alert_x = [shield_instance.x_data[j] for j, alert in enumerate(shield_instance.alerts) if alert]
alert_y = [shield_instance.y_data[j] for j, alert in enumerate(shield_instance.alerts) if alert]
ax1.plot(alert_x, alert_y, 'ro', markersize=8, fillstyle='none') # Círculos rojos en los paquetes anómalos
# Título dinámico
ax1.set_title(f"PASAIA LAB: Monitor de Tráfico | {status}", color='red' if is_alert else 'green')
return line_packet, line_zscore,
# --- CONFIGURACIÓN DEL MONITOR ---
if __name__ == "__main__":
escudo = PasaiaShield(threshold=3.0) # Umbral de alerta más estricto
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
fig.suptitle('PASAIA LAB: Escudo de Ciberseguridad IA', fontsize=16)
# Gráfico 1: Tamaño de Paquete
line_packet, = ax1.plot([], [], 'g-', label='Tamaño de Paquete (KB)')
ax1.set_ylabel('Tamaño de Paquete (KB)')
ax1.legend()
ax1.grid(True)
# Gráfico 2: Z-Score de Anomalía
line_zscore, = ax2.plot([], [], 'b-', label='Z-Score')
ax2.set_xlabel('Tiempo (Iteraciones)')
ax2.set_ylabel('Z-Score')
ax2.legend()
ax2.grid(True)
ax2.axhline(escudo.threshold, color='r', linestyle='--', label=f'Umbral Z-Score ({escudo.threshold})') # Línea de umbral
# Inicia la animación
ani = animation.FuncAnimation(fig, animate, fargs=(escudo, line_packet, line_zscore, ax1, ax2),
interval=100, blit=True, cache_frame_data=False) # Intervalo en ms
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()
Visualización en Acción (Captura del Monitor)
Aquí puedes insertar una imagen (JPG/PNG) o un GIF animado de cómo se ve el monitor en tiempo real. Esto hará que tu publicación sea mucho más atractiva y fácil de entender.
Conclusión: Hacia una Ciberseguridad Inteligente y Descentralizada
El "Escudo Pasaia 2026" representa el futuro de la ciberseguridad: un sistema proactivo, autónomo y globalmente inteligente. No solo protegemos nuestros datos, sino que contribuimos a una red de defensa más robusta para todos.
Equipo de Seguridad, PASAIA LAB
https://substack.com/@agustintxo
https://agustintxo.substack.com/
BRAINSTORMING
- Tormenta de Ideas de PASAIA LAB © 2025 by José
Agustín Fontán Varela is licensed under CC
BY-NC-ND 4.0
BRAINSTORMING
- Tormenta de Ideas de PASAIA LAB © 2025 by José
Agustín Fontán Varela is licensed under Creative
Commons Attribution-NonCommercial-NoDerivatives 4.0 International
Tormenta Work Free Intelligence + IA Free Intelligence Laboratory by José Agustín Fontán Varela is licensed under CC BY-NC-ND 4.0







