Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions social_media_analyzer/operational_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import re
from sensitive_data_scanner.scanner import SENSITIVE_DATA_PATTERNS
from supply_chain_platform.security_tools import InfrastructureProtectionAI

class CloudSecurityAI:
"""AI for scanning cloud credentials and sensitive information."""

def _redact(self, value):
"""Redacts a sensitive string, keeping only the first 4 and last 4 characters."""
if len(value) <= 10:
return "****"
return f"{value[:4]}...{value[-4:]}"

def scan_content(self, text_content):
findings = {}
for pattern_name, regex in SENSITIVE_DATA_PATTERNS.items():
matches = regex.findall(text_content)
if matches:
# Redact each match to avoid full exposure
findings[pattern_name] = [self._redact(m) for m in matches]
return findings

class IoTSecurityAI:
"""AI for monitoring IoT device telemetry and detecting anomalies."""

def __init__(self):
self.infra_protection = InfrastructureProtectionAI()

def analyze_telemetry(self, device_data):
"""
Wraps the InfrastructureProtectionAI logic for IoT telemetry analysis.
"""
return self.infra_protection.detect_iot_tampering(device_data)

class OpSecAI:
"""AI for Operational Security (OpSec) analysis of logs and procedures."""

SUSPICIOUS_OPSEC_PATTERNS = {
"Unauthorized Login Attempt": re.compile(r"failed login|unauthorized access|invalid credentials", re.I),
"Privilege Escalation": re.compile(r"sudo usage|root access granted|privilege elevation", re.I),
"Data Exfiltration Pattern": re.compile(r"large outbound transfer|data dump|exfiltrating", re.I),
"Internal Scan Activity": re.compile(r"nmap scan|port sweep|internal reconnaissance", re.I),
"Insecure Communication": re.compile(r"http transfer|unencrypted channel|plaintext password", re.I)
}

def analyze_logs(self, log_entries):
"""
Analyzes a list of log strings for operational security risks.
"""
risk_score = 0
findings = []

log_blob = "\n".join(log_entries)

for threat_name, regex in self.SUSPICIOUS_OPSEC_PATTERNS.items():
matches = regex.findall(log_blob)
if matches:
findings.append(f"{threat_name} detected: {len(matches)} occurrences.")
risk_score += len(matches) * 2

if not findings:
return {"status": "SECURE", "score": 0, "findings": ["No operational security threats detected."]}
else:
status = "CRITICAL" if risk_score > 10 else "WARNING"
return {
"status": status,
"score": min(risk_score, 100),
"findings": findings
}

def analyze_cloud_security(content):
scanner = CloudSecurityAI()
return scanner.scan_content(content)

def analyze_iot_security(device_data):
scanner = IoTSecurityAI()
return scanner.analyze_telemetry(device_data)

def analyze_opsec_security(logs):
scanner = OpSecAI()
return scanner.analyze_logs(logs)
35 changes: 35 additions & 0 deletions social_media_analyzer/test_operational_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import unittest
from social_media_analyzer.operational_security import CloudSecurityAI, IoTSecurityAI, OpSecAI

class TestOperationalSecurity(unittest.TestCase):
def test_cloud_security_scan(self):
ai = CloudSecurityAI()
content = "My AWS Key is AKIA1234567890ABCDEF"
findings = ai.scan_content(content)
self.assertIn("AWS Access Key ID", findings)
# Verify redaction: AKIA1234567890ABCDEF -> AKIA...CDEF
self.assertEqual(findings["AWS Access Key ID"], ["AKIA...CDEF"])

def test_iot_security_analyze(self):
ai = IoTSecurityAI()
# Test warning case
device_data = {'voltage': 2.5, 'temperature': 80, 'rssi': -95}
result = ai.analyze_telemetry(device_data)
self.assertEqual(result["status"], "WARNING")
self.assertTrue(len(result["findings"]) > 0)

# Test secure case
secure_data = {'voltage': 3.3, 'temperature': 25, 'rssi': -50}
result = ai.analyze_telemetry(secure_data)
self.assertEqual(result["status"], "SECURE")

def test_opsec_analyze(self):
ai = OpSecAI()
logs = ["unauthorized access attempt", "nmap scan detected"]
result = ai.analyze_logs(logs)
self.assertEqual(result["status"], "WARNING")
self.assertTrue(any("Unauthorized Login Attempt" in f for f in result["findings"]))
self.assertTrue(any("Internal Scan Activity" in f for f in result["findings"]))

if __name__ == "__main__":
unittest.main()
2 changes: 1 addition & 1 deletion src/Marketplace.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const tools = [
{
id: 'assistance',
name: 'Official Assistance',
description: 'Integrated support tools for Police, Military, and Gendarmerie.',
description: 'Integrated support tools for Police, Military, Gendarmerie, and Operational Security.',
icon: '🛡️'
}
];
Expand Down
104 changes: 103 additions & 1 deletion src/OfficialAssistance.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,70 @@ const assistanceRoles = {
{ id: 'traffic', name: 'Traffic Management', icon: '🚦', desc: 'Coordination of road safety and major transit routes.' },
{ id: 'response', name: 'Specialized Response', icon: '🚨', desc: 'Elite units for counter-terrorism and high-risk interventions.' }
]
},
opsec: {
title: 'Operational Security',
icon: '🔐',
description: 'Cloud, IoT, and AI-driven security operations for modern infrastructure.',
tools: [
{
id: 'cloud_guard',
name: 'Cloud Guard',
icon: '☁️',
desc: 'AI scanner for leaked credentials and sensitive cloud data.',
endpoint: '/analyze/cloud',
getPayload: () => ({ content: "Cloud scan simulation with fake AWS key: AKIA0000000000000000 and fake Google API Key: AIza00000000000000000000000000000000000" })
},
{
id: 'iot_shield',
name: 'IoT Shield',
icon: '🌐',
desc: 'Real-time anomaly detection for industrial IoT networks.',
endpoint: '/analyze/iot',
getPayload: () => ({ device_data: { voltage: 2.6, temperature: 82, rssi: -95 } })
},
{
id: 'opsec_analyzer',
name: 'OpSec Analyzer',
icon: '🕵️',
desc: 'AI-driven analysis of operational logs for procedural threats.',
endpoint: '/analyze/opsec',
getPayload: () => ({ logs: ["unauthorized access attempt", "nmap scan detected", "large outbound transfer", "sudo usage"] })
}
]
}
};

export default function OfficialAssistance() {
const [activeRole, setActiveRole] = useState('police');
const [analysisResult, setAnalysisResult] = useState(null);
const [loading, setLoading] = useState(false);

const handleLaunch = async (tool) => {
if (!tool.endpoint) {
alert(`Launching ${tool.name}... (Simulation mode)`);
return;
}

setLoading(true);
setAnalysisResult(null);

try {
const response = await fetch(tool.endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(tool.getPayload())
});

const data = await response.json();
setAnalysisResult({ title: tool.name, data });
} catch (error) {
console.error("Error launching tool:", error);
alert("Failed to connect to security backend. Make sure the Flask server is running.");
} finally {
setLoading(false);
}
};

return (
<div className="assistance-container">
Expand Down Expand Up @@ -63,10 +122,24 @@ export default function OfficialAssistance() {
<h3>{tool.name}</h3>
<p>{tool.desc}</p>
</div>
<button className="action-btn" onClick={() => alert(`Launching ${tool.name}...`)}>Launch</button>
<button
className="action-btn"
onClick={() => handleLaunch(tool)}
disabled={loading}
>
{loading ? 'Processing...' : 'Launch'}
</button>
</div>
))}
</div>

{analysisResult && (
<div className="analysis-result">
<h3>{analysisResult.title} - AI Analysis Output</h3>
<pre>{JSON.stringify(analysisResult.data, null, 2)}</pre>
<button className="close-result" onClick={() => setAnalysisResult(null)}>Close Results</button>
</div>
)}
</div>

<style jsx>{`
Expand Down Expand Up @@ -150,6 +223,35 @@ export default function OfficialAssistance() {
font-weight: bold;
cursor: pointer;
}
.action-btn:disabled {
background: #555;
cursor: not-allowed;
}
.analysis-result {
margin-top: 30px;
background: #1e2127;
padding: 20px;
border-radius: 8px;
border: 1px solid #61dafb;
}
.analysis-result pre {
background: #000;
padding: 15px;
border-radius: 5px;
overflow-x: auto;
color: #00ff00;
font-family: 'Courier New', Courier, monospace;
font-size: 0.85rem;
}
.close-result {
background: transparent;
color: #61dafb;
border: 1px solid #61dafb;
padding: 5px 15px;
border-radius: 4px;
cursor: pointer;
margin-top: 10px;
}
`}</style>
</div>
);
Expand Down
50 changes: 49 additions & 1 deletion text_message_analyzer/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from flask import Flask, request, jsonify
from social_media_analyzer import scam_detector, fake_news_detector, ai_content_detector, fake_content_verifier
from social_media_analyzer import (
scam_detector,
fake_news_detector,
ai_content_detector,
fake_content_verifier,
operational_security
)
import os

app = Flask(__name__)
Expand Down Expand Up @@ -51,6 +57,48 @@ def analyze_fake_content():
result = fake_content_verifier.analyze_text_for_fake_content(text_to_analyze)
return jsonify(result)

@app.route('/analyze/cloud', methods=['POST'])
def analyze_cloud():
data = request.get_json()
if not data or 'content' not in data:
return jsonify({"error": "Missing 'content' in request body"}), 400

content = data['content']
try:
result = operational_security.analyze_cloud_security(content)
return jsonify(result)
except Exception as e:
return jsonify({"error": f"Failed to analyze cloud security: {str(e)}"}), 500

@app.route('/analyze/iot', methods=['POST'])
def analyze_iot():
data = request.get_json()
if not data or 'device_data' not in data:
return jsonify({"error": "Missing 'device_data' in request body"}), 400

device_data = data['device_data']
try:
result = operational_security.analyze_iot_security(device_data)
return jsonify(result)
except Exception as e:
return jsonify({"error": f"Failed to analyze IoT security: {str(e)}"}), 500

@app.route('/analyze/opsec', methods=['POST'])
def analyze_opsec():
data = request.get_json()
if not data or 'logs' not in data:
return jsonify({"error": "Missing 'logs' in request body"}), 400

logs = data['logs']
if not isinstance(logs, list) or not all(isinstance(log, str) for log in logs):
return jsonify({"error": "'logs' must be a list of strings"}), 400

try:
result = operational_security.analyze_opsec_security(logs)
return jsonify(result)
except Exception as e:
return jsonify({"error": f"Failed to analyze operational security: {str(e)}"}), 500


if __name__ == '__main__':
app.run(debug=True)
Loading