generated from github/codespaces-react
-
-
Notifications
You must be signed in to change notification settings - Fork 1
Operational Security Role Enhancement #33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
GYFX35
wants to merge
2
commits into
main
Choose a base branch
from
operational-security-role-enhancement-4755552451855147053
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| import re | ||
| from sensitive_data_scanner.scanner import SENSITIVE_DATA_PATTERNS | ||
| from supply_chain_platform.security_tools import InfrastructureProtectionAI | ||
|
|
||
| class CloudSecurityAI: | ||
| """AI for scanning cloud credentials and sensitive information.""" | ||
|
|
||
| def _redact(self, value): | ||
| """Redacts a sensitive string, keeping only the first 4 and last 4 characters.""" | ||
| if len(value) <= 10: | ||
| return "****" | ||
| return f"{value[:4]}...{value[-4:]}" | ||
|
|
||
| def scan_content(self, text_content): | ||
| findings = {} | ||
| for pattern_name, regex in SENSITIVE_DATA_PATTERNS.items(): | ||
| matches = regex.findall(text_content) | ||
| if matches: | ||
| # Redact each match to avoid full exposure | ||
| findings[pattern_name] = [self._redact(m) for m in matches] | ||
| return findings | ||
|
|
||
| class IoTSecurityAI: | ||
| """AI for monitoring IoT device telemetry and detecting anomalies.""" | ||
|
|
||
| def __init__(self): | ||
| self.infra_protection = InfrastructureProtectionAI() | ||
|
|
||
| def analyze_telemetry(self, device_data): | ||
| """ | ||
| Wraps the InfrastructureProtectionAI logic for IoT telemetry analysis. | ||
| """ | ||
| return self.infra_protection.detect_iot_tampering(device_data) | ||
|
|
||
| class OpSecAI: | ||
| """AI for Operational Security (OpSec) analysis of logs and procedures.""" | ||
|
|
||
| SUSPICIOUS_OPSEC_PATTERNS = { | ||
| "Unauthorized Login Attempt": re.compile(r"failed login|unauthorized access|invalid credentials", re.I), | ||
| "Privilege Escalation": re.compile(r"sudo usage|root access granted|privilege elevation", re.I), | ||
| "Data Exfiltration Pattern": re.compile(r"large outbound transfer|data dump|exfiltrating", re.I), | ||
| "Internal Scan Activity": re.compile(r"nmap scan|port sweep|internal reconnaissance", re.I), | ||
| "Insecure Communication": re.compile(r"http transfer|unencrypted channel|plaintext password", re.I) | ||
| } | ||
|
|
||
| def analyze_logs(self, log_entries): | ||
| """ | ||
| Analyzes a list of log strings for operational security risks. | ||
| """ | ||
| risk_score = 0 | ||
| findings = [] | ||
|
|
||
| log_blob = "\n".join(log_entries) | ||
|
|
||
| for threat_name, regex in self.SUSPICIOUS_OPSEC_PATTERNS.items(): | ||
| matches = regex.findall(log_blob) | ||
| if matches: | ||
| findings.append(f"{threat_name} detected: {len(matches)} occurrences.") | ||
| risk_score += len(matches) * 2 | ||
|
|
||
| if not findings: | ||
| return {"status": "SECURE", "score": 0, "findings": ["No operational security threats detected."]} | ||
| else: | ||
| status = "CRITICAL" if risk_score > 10 else "WARNING" | ||
| return { | ||
| "status": status, | ||
| "score": min(risk_score, 100), | ||
| "findings": findings | ||
| } | ||
|
|
||
| def analyze_cloud_security(content): | ||
| scanner = CloudSecurityAI() | ||
| return scanner.scan_content(content) | ||
|
|
||
| def analyze_iot_security(device_data): | ||
| scanner = IoTSecurityAI() | ||
| return scanner.analyze_telemetry(device_data) | ||
|
|
||
| def analyze_opsec_security(logs): | ||
| scanner = OpSecAI() | ||
| return scanner.analyze_logs(logs) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| import unittest | ||
| from social_media_analyzer.operational_security import CloudSecurityAI, IoTSecurityAI, OpSecAI | ||
|
|
||
| class TestOperationalSecurity(unittest.TestCase): | ||
| def test_cloud_security_scan(self): | ||
| ai = CloudSecurityAI() | ||
| content = "My AWS Key is AKIA1234567890ABCDEF" | ||
| findings = ai.scan_content(content) | ||
| self.assertIn("AWS Access Key ID", findings) | ||
| # Verify redaction: AKIA1234567890ABCDEF -> AKIA...CDEF | ||
| self.assertEqual(findings["AWS Access Key ID"], ["AKIA...CDEF"]) | ||
|
|
||
| def test_iot_security_analyze(self): | ||
| ai = IoTSecurityAI() | ||
| # Test warning case | ||
| device_data = {'voltage': 2.5, 'temperature': 80, 'rssi': -95} | ||
| result = ai.analyze_telemetry(device_data) | ||
| self.assertEqual(result["status"], "WARNING") | ||
| self.assertTrue(len(result["findings"]) > 0) | ||
|
|
||
| # Test secure case | ||
| secure_data = {'voltage': 3.3, 'temperature': 25, 'rssi': -50} | ||
| result = ai.analyze_telemetry(secure_data) | ||
| self.assertEqual(result["status"], "SECURE") | ||
|
|
||
| def test_opsec_analyze(self): | ||
| ai = OpSecAI() | ||
| logs = ["unauthorized access attempt", "nmap scan detected"] | ||
| result = ai.analyze_logs(logs) | ||
| self.assertEqual(result["status"], "WARNING") | ||
| self.assertTrue(any("Unauthorized Login Attempt" in f for f in result["findings"])) | ||
| self.assertTrue(any("Internal Scan Activity" in f for f in result["findings"])) | ||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.