diff --git a/social_media_analyzer/operational_security.py b/social_media_analyzer/operational_security.py new file mode 100644 index 0000000..ac48dde --- /dev/null +++ b/social_media_analyzer/operational_security.py @@ -0,0 +1,81 @@ +import re +from sensitive_data_scanner.scanner import SENSITIVE_DATA_PATTERNS +from supply_chain_platform.security_tools import InfrastructureProtectionAI + +class CloudSecurityAI: + """AI for scanning cloud credentials and sensitive information.""" + + def _redact(self, value): + """Redacts a sensitive string, keeping only the first 4 and last 4 characters.""" + if len(value) <= 10: + return "****" + return f"{value[:4]}...{value[-4:]}" + + def scan_content(self, text_content): + findings = {} + for pattern_name, regex in SENSITIVE_DATA_PATTERNS.items(): + matches = regex.findall(text_content) + if matches: + # Redact each match to avoid full exposure + findings[pattern_name] = [self._redact(m) for m in matches] + return findings + +class IoTSecurityAI: + """AI for monitoring IoT device telemetry and detecting anomalies.""" + + def __init__(self): + self.infra_protection = InfrastructureProtectionAI() + + def analyze_telemetry(self, device_data): + """ + Wraps the InfrastructureProtectionAI logic for IoT telemetry analysis. + """ + return self.infra_protection.detect_iot_tampering(device_data) + +class OpSecAI: + """AI for Operational Security (OpSec) analysis of logs and procedures.""" + + SUSPICIOUS_OPSEC_PATTERNS = { + "Unauthorized Login Attempt": re.compile(r"failed login|unauthorized access|invalid credentials", re.I), + "Privilege Escalation": re.compile(r"sudo usage|root access granted|privilege elevation", re.I), + "Data Exfiltration Pattern": re.compile(r"large outbound transfer|data dump|exfiltrating", re.I), + "Internal Scan Activity": re.compile(r"nmap scan|port sweep|internal reconnaissance", re.I), + "Insecure Communication": re.compile(r"http transfer|unencrypted channel|plaintext password", re.I) + } + + def analyze_logs(self, log_entries): + """ + Analyzes a list of log strings for operational security risks. + """ + risk_score = 0 + findings = [] + + log_blob = "\n".join(log_entries) + + for threat_name, regex in self.SUSPICIOUS_OPSEC_PATTERNS.items(): + matches = regex.findall(log_blob) + if matches: + findings.append(f"{threat_name} detected: {len(matches)} occurrences.") + risk_score += len(matches) * 2 + + if not findings: + return {"status": "SECURE", "score": 0, "findings": ["No operational security threats detected."]} + else: + status = "CRITICAL" if risk_score > 10 else "WARNING" + return { + "status": status, + "score": min(risk_score, 100), + "findings": findings + } + +def analyze_cloud_security(content): + scanner = CloudSecurityAI() + return scanner.scan_content(content) + +def analyze_iot_security(device_data): + scanner = IoTSecurityAI() + return scanner.analyze_telemetry(device_data) + +def analyze_opsec_security(logs): + scanner = OpSecAI() + return scanner.analyze_logs(logs) diff --git a/social_media_analyzer/test_operational_security.py b/social_media_analyzer/test_operational_security.py new file mode 100644 index 0000000..4b0c84e --- /dev/null +++ b/social_media_analyzer/test_operational_security.py @@ -0,0 +1,35 @@ +import unittest +from social_media_analyzer.operational_security import CloudSecurityAI, IoTSecurityAI, OpSecAI + +class TestOperationalSecurity(unittest.TestCase): + def test_cloud_security_scan(self): + ai = CloudSecurityAI() + content = "My AWS Key is AKIA1234567890ABCDEF" + findings = ai.scan_content(content) + self.assertIn("AWS Access Key ID", findings) + # Verify redaction: AKIA1234567890ABCDEF -> AKIA...CDEF + self.assertEqual(findings["AWS Access Key ID"], ["AKIA...CDEF"]) + + def test_iot_security_analyze(self): + ai = IoTSecurityAI() + # Test warning case + device_data = {'voltage': 2.5, 'temperature': 80, 'rssi': -95} + result = ai.analyze_telemetry(device_data) + self.assertEqual(result["status"], "WARNING") + self.assertTrue(len(result["findings"]) > 0) + + # Test secure case + secure_data = {'voltage': 3.3, 'temperature': 25, 'rssi': -50} + result = ai.analyze_telemetry(secure_data) + self.assertEqual(result["status"], "SECURE") + + def test_opsec_analyze(self): + ai = OpSecAI() + logs = ["unauthorized access attempt", "nmap scan detected"] + result = ai.analyze_logs(logs) + self.assertEqual(result["status"], "WARNING") + self.assertTrue(any("Unauthorized Login Attempt" in f for f in result["findings"])) + self.assertTrue(any("Internal Scan Activity" in f for f in result["findings"])) + +if __name__ == "__main__": + unittest.main() diff --git a/src/Marketplace.jsx b/src/Marketplace.jsx index 912c0c1..a19a5a8 100644 --- a/src/Marketplace.jsx +++ b/src/Marketplace.jsx @@ -40,7 +40,7 @@ const tools = [ { id: 'assistance', name: 'Official Assistance', - description: 'Integrated support tools for Police, Military, and Gendarmerie.', + description: 'Integrated support tools for Police, Military, Gendarmerie, and Operational Security.', icon: '🛡️' } ]; diff --git a/src/OfficialAssistance.jsx b/src/OfficialAssistance.jsx index 95c4bee..4eb1cd4 100644 --- a/src/OfficialAssistance.jsx +++ b/src/OfficialAssistance.jsx @@ -30,11 +30,70 @@ const assistanceRoles = { { id: 'traffic', name: 'Traffic Management', icon: '🚦', desc: 'Coordination of road safety and major transit routes.' }, { id: 'response', name: 'Specialized Response', icon: '🚨', desc: 'Elite units for counter-terrorism and high-risk interventions.' } ] + }, + opsec: { + title: 'Operational Security', + icon: '🔐', + description: 'Cloud, IoT, and AI-driven security operations for modern infrastructure.', + tools: [ + { + id: 'cloud_guard', + name: 'Cloud Guard', + icon: '☁️', + desc: 'AI scanner for leaked credentials and sensitive cloud data.', + endpoint: '/analyze/cloud', + getPayload: () => ({ content: "Cloud scan simulation with fake AWS key: AKIA0000000000000000 and fake Google API Key: AIza00000000000000000000000000000000000" }) + }, + { + id: 'iot_shield', + name: 'IoT Shield', + icon: '🌐', + desc: 'Real-time anomaly detection for industrial IoT networks.', + endpoint: '/analyze/iot', + getPayload: () => ({ device_data: { voltage: 2.6, temperature: 82, rssi: -95 } }) + }, + { + id: 'opsec_analyzer', + name: 'OpSec Analyzer', + icon: '🕵️', + desc: 'AI-driven analysis of operational logs for procedural threats.', + endpoint: '/analyze/opsec', + getPayload: () => ({ logs: ["unauthorized access attempt", "nmap scan detected", "large outbound transfer", "sudo usage"] }) + } + ] } }; export default function OfficialAssistance() { const [activeRole, setActiveRole] = useState('police'); + const [analysisResult, setAnalysisResult] = useState(null); + const [loading, setLoading] = useState(false); + + const handleLaunch = async (tool) => { + if (!tool.endpoint) { + alert(`Launching ${tool.name}... (Simulation mode)`); + return; + } + + setLoading(true); + setAnalysisResult(null); + + try { + const response = await fetch(tool.endpoint, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(tool.getPayload()) + }); + + const data = await response.json(); + setAnalysisResult({ title: tool.name, data }); + } catch (error) { + console.error("Error launching tool:", error); + alert("Failed to connect to security backend. Make sure the Flask server is running."); + } finally { + setLoading(false); + } + }; return (
{tool.desc}
{JSON.stringify(analysisResult.data, null, 2)}
+
+