secure-web/backend/scanner/headers_scanner.py

509 lines
21 KiB
Python

"""
HTTP Headers Security Scanner.
This module checks HTTP response headers for security best practices,
including CSP, HSTS, X-Frame-Options, and cookie security flags.
"""
import time
import logging
import ssl
import socket
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse
import httpx
from .base import BaseScanner, ScannerResult, ScannerStatus
logger = logging.getLogger('scanner')
class HeadersScanner(BaseScanner):
"""
Scanner for HTTP security headers and TLS configuration.
Checks for:
- Security headers (CSP, HSTS, X-Frame-Options, etc.)
- Cookie security flags (Secure, HttpOnly, SameSite)
- CORS configuration
- TLS/SSL certificate validity
- HTTP to HTTPS redirect
"""
name = "header_check"
# Required security headers and their importance
SECURITY_HEADERS = {
'strict-transport-security': {
'severity': 'high',
'title': 'Missing Strict-Transport-Security (HSTS) header',
'description': 'HSTS ensures browsers only connect via HTTPS, preventing SSL stripping attacks.',
'remediation': 'Add header: Strict-Transport-Security: max-age=31536000; includeSubDomains; preload'
},
'content-security-policy': {
'severity': 'high',
'title': 'Missing Content-Security-Policy (CSP) header',
'description': 'CSP helps prevent XSS attacks by controlling which resources can be loaded.',
'remediation': "Add a Content-Security-Policy header. Start with: Content-Security-Policy: default-src 'self'"
},
'x-frame-options': {
'severity': 'medium',
'title': 'Missing X-Frame-Options header',
'description': 'X-Frame-Options prevents clickjacking attacks by controlling iframe embedding.',
'remediation': 'Add header: X-Frame-Options: DENY or X-Frame-Options: SAMEORIGIN'
},
'x-content-type-options': {
'severity': 'medium',
'title': 'Missing X-Content-Type-Options header',
'description': 'Prevents MIME type sniffing, reducing risk of drive-by downloads.',
'remediation': 'Add header: X-Content-Type-Options: nosniff'
},
'referrer-policy': {
'severity': 'low',
'title': 'Missing Referrer-Policy header',
'description': 'Controls how much referrer information is sent with requests.',
'remediation': 'Add header: Referrer-Policy: strict-origin-when-cross-origin'
},
'permissions-policy': {
'severity': 'low',
'title': 'Missing Permissions-Policy header',
'description': 'Controls which browser features can be used by the page.',
'remediation': 'Add header: Permissions-Policy: geolocation=(), camera=(), microphone=()'
},
}
def __init__(self, config: dict = None):
super().__init__(config)
self.timeout = self.config.get('timeout', 30)
def run(self, url: str) -> ScannerResult:
"""
Run HTTP headers and TLS security checks.
Args:
url: The URL to scan
Returns:
ScannerResult with header analysis
"""
start_time = time.time()
issues = []
metrics = []
raw_data = {
'headers': {},
'cookies': [],
'tls': {},
'redirects': [],
}
try:
# Create HTTP client with redirect following
with httpx.Client(
timeout=self.timeout,
follow_redirects=True,
verify=True
) as client:
# Make GET request
response = client.get(url)
# Store response headers
raw_data['headers'] = dict(response.headers)
raw_data['status_code'] = response.status_code
raw_data['final_url'] = str(response.url)
# Track redirects
raw_data['redirects'] = [
{
'url': str(r.url),
'status_code': r.status_code
}
for r in response.history
]
# Check for HTTP to HTTPS redirect
redirect_issues = self._check_https_redirect(url, response)
issues.extend(redirect_issues)
# Check security headers
header_issues = self._check_security_headers(response.headers)
issues.extend(header_issues)
# Check cookies
cookie_issues, cookie_data = self._check_cookies(response)
issues.extend(cookie_issues)
raw_data['cookies'] = cookie_data
# Check CORS
cors_issues = self._check_cors(response.headers, url)
issues.extend(cors_issues)
# Check for information disclosure
info_issues = self._check_info_disclosure(response.headers)
issues.extend(info_issues)
# Check TLS certificate
tls_issues, tls_data = self._check_tls(url)
issues.extend(tls_issues)
raw_data['tls'] = tls_data
# Add metrics
metrics.append(self._create_metric(
'security_headers_present',
'Security Headers Present',
self._count_present_headers(raw_data['headers']),
'count'
))
metrics.append(self._create_metric(
'security_headers_total',
'Total Security Headers Checked',
len(self.SECURITY_HEADERS),
'count'
))
execution_time = time.time() - start_time
return ScannerResult(
status=ScannerStatus.SUCCESS,
scanner_name=self.name,
issues=issues,
metrics=metrics,
raw_data=raw_data,
execution_time_seconds=execution_time
)
except httpx.TimeoutException as e:
return ScannerResult(
status=ScannerStatus.FAILED,
scanner_name=self.name,
error_message=f"Request timed out: {e}",
execution_time_seconds=time.time() - start_time
)
except httpx.RequestError as e:
return ScannerResult(
status=ScannerStatus.FAILED,
scanner_name=self.name,
error_message=f"Request failed: {e}",
execution_time_seconds=time.time() - start_time
)
except Exception as e:
logger.exception(f"Header check failed for {url}")
return ScannerResult(
status=ScannerStatus.FAILED,
scanner_name=self.name,
error_message=f"Unexpected error: {e}",
execution_time_seconds=time.time() - start_time
)
def _check_security_headers(self, headers: httpx.Headers) -> List[dict]:
"""Check for missing or misconfigured security headers."""
issues = []
headers_lower = {k.lower(): v for k, v in headers.items()}
for header_name, config in self.SECURITY_HEADERS.items():
if header_name not in headers_lower:
issues.append(self._create_issue(
category='headers',
severity=config['severity'],
title=config['title'],
description=config['description'],
remediation=config['remediation']
))
else:
# Check for weak configurations
value = headers_lower[header_name]
if header_name == 'strict-transport-security':
# Check for short max-age
if 'max-age' in value.lower():
try:
max_age = int(value.lower().split('max-age=')[1].split(';')[0])
if max_age < 31536000: # Less than 1 year
issues.append(self._create_issue(
category='headers',
severity='low',
title='HSTS max-age is too short',
description=f'HSTS max-age is {max_age} seconds. Recommend at least 1 year (31536000).',
remediation='Increase max-age to at least 31536000 seconds.'
))
except (IndexError, ValueError):
pass
elif header_name == 'content-security-policy':
# Check for unsafe directives
if "'unsafe-inline'" in value or "'unsafe-eval'" in value:
issues.append(self._create_issue(
category='headers',
severity='medium',
title='CSP contains unsafe directives',
description="Content-Security-Policy contains 'unsafe-inline' or 'unsafe-eval' which weakens XSS protection.",
remediation="Remove 'unsafe-inline' and 'unsafe-eval' from CSP. Use nonces or hashes instead."
))
return issues
def _check_https_redirect(self, original_url: str, response: httpx.Response) -> List[dict]:
"""Check for HTTP to HTTPS redirect."""
issues = []
parsed = urlparse(original_url)
if parsed.scheme == 'http':
final_url = str(response.url)
final_parsed = urlparse(final_url)
if final_parsed.scheme == 'https':
# Good - redirects to HTTPS
pass
else:
issues.append(self._create_issue(
category='tls',
severity='high',
title='No HTTP to HTTPS redirect',
description='The site does not redirect HTTP requests to HTTPS, allowing insecure connections.',
remediation='Configure server to redirect all HTTP traffic to HTTPS (301 redirect).'
))
return issues
def _check_cookies(self, response: httpx.Response) -> Tuple[List[dict], List[dict]]:
"""Check cookie security flags."""
issues = []
cookie_data = []
# Get cookies from Set-Cookie headers
set_cookie_headers = response.headers.get_list('set-cookie')
for cookie_header in set_cookie_headers:
cookie_info = self._parse_cookie(cookie_header)
cookie_data.append(cookie_info)
cookie_name = cookie_info.get('name', 'Unknown')
is_https = str(response.url).startswith('https')
# Check Secure flag
if is_https and not cookie_info.get('secure'):
issues.append(self._create_issue(
category='security',
severity='medium',
title=f"Cookie '{cookie_name}' missing Secure flag",
description='Cookie transmitted over HTTPS should have the Secure flag to prevent transmission over HTTP.',
remediation='Add the Secure flag to the Set-Cookie header.'
))
# Check HttpOnly flag (for session-like cookies)
if not cookie_info.get('httponly'):
# Check if it looks like a session cookie
session_indicators = ['session', 'sess', 'auth', 'token', 'jwt', 'csrf']
if any(ind in cookie_name.lower() for ind in session_indicators):
issues.append(self._create_issue(
category='security',
severity='medium',
title=f"Cookie '{cookie_name}' missing HttpOnly flag",
description='Session cookies should have HttpOnly flag to prevent JavaScript access (XSS protection).',
remediation='Add the HttpOnly flag to the Set-Cookie header.'
))
# Check SameSite attribute
if not cookie_info.get('samesite'):
issues.append(self._create_issue(
category='security',
severity='low',
title=f"Cookie '{cookie_name}' missing SameSite attribute",
description='SameSite attribute helps prevent CSRF attacks.',
remediation='Add SameSite=Strict or SameSite=Lax to the Set-Cookie header.'
))
return issues, cookie_data
def _parse_cookie(self, cookie_header: str) -> dict:
"""Parse a Set-Cookie header into a dictionary."""
parts = cookie_header.split(';')
# First part is name=value
name_value = parts[0].strip()
if '=' in name_value:
name, value = name_value.split('=', 1)
else:
name, value = name_value, ''
cookie_info = {
'name': name.strip(),
'value': value[:50] + '...' if len(value) > 50 else value, # Truncate for privacy
'secure': False,
'httponly': False,
'samesite': None,
'path': None,
'domain': None,
'expires': None,
}
for part in parts[1:]:
part = part.strip().lower()
if part == 'secure':
cookie_info['secure'] = True
elif part == 'httponly':
cookie_info['httponly'] = True
elif part.startswith('samesite='):
cookie_info['samesite'] = part.split('=')[1]
elif part.startswith('path='):
cookie_info['path'] = part.split('=')[1]
elif part.startswith('domain='):
cookie_info['domain'] = part.split('=')[1]
return cookie_info
def _check_cors(self, headers: httpx.Headers, url: str) -> List[dict]:
"""Check CORS configuration for security issues."""
issues = []
acao = headers.get('access-control-allow-origin', '').lower()
acac = headers.get('access-control-allow-credentials', '').lower()
if acao == '*':
if acac == 'true':
issues.append(self._create_issue(
category='cors',
severity='critical',
title='Dangerous CORS configuration',
description='Access-Control-Allow-Origin: * combined with Access-Control-Allow-Credentials: true is a security vulnerability.',
remediation='Never use wildcard origin with credentials. Specify allowed origins explicitly.'
))
else:
issues.append(self._create_issue(
category='cors',
severity='info',
title='CORS allows all origins',
description='Access-Control-Allow-Origin is set to *, allowing any website to make requests.',
remediation='Consider restricting CORS to specific trusted origins if the API handles sensitive data.'
))
return issues
def _check_info_disclosure(self, headers: httpx.Headers) -> List[dict]:
"""Check for information disclosure in headers."""
issues = []
# Check Server header
server = headers.get('server', '')
if server:
# Check for version numbers
import re
if re.search(r'\d+\.\d+', server):
issues.append(self._create_issue(
category='security',
severity='low',
title='Server version disclosed',
description=f'The Server header reveals version information: {server}',
remediation='Configure the server to hide version information.'
))
# Check X-Powered-By
powered_by = headers.get('x-powered-by', '')
if powered_by:
issues.append(self._create_issue(
category='security',
severity='low',
title='X-Powered-By header present',
description=f'The X-Powered-By header reveals technology: {powered_by}',
remediation='Remove the X-Powered-By header to reduce information disclosure.'
))
# Check X-AspNet-Version
aspnet = headers.get('x-aspnet-version', '')
if aspnet:
issues.append(self._create_issue(
category='security',
severity='low',
title='ASP.NET version disclosed',
description=f'X-AspNet-Version header reveals: {aspnet}',
remediation='Disable ASP.NET version header in web.config.'
))
return issues
def _check_tls(self, url: str) -> Tuple[List[dict], dict]:
"""Check TLS/SSL certificate validity."""
issues = []
tls_data = {
'has_tls': False,
'certificate_valid': None,
'issuer': None,
'expires': None,
'protocol': None,
}
parsed = urlparse(url)
# Only check HTTPS URLs
if parsed.scheme != 'https':
issues.append(self._create_issue(
category='tls',
severity='high',
title='Site not served over HTTPS',
description='The site is served over unencrypted HTTP, exposing data to interception.',
remediation='Enable HTTPS with a valid TLS certificate.'
))
return issues, tls_data
hostname = parsed.hostname
port = parsed.port or 443
try:
context = ssl.create_default_context()
with socket.create_connection((hostname, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
tls_data['has_tls'] = True
tls_data['protocol'] = ssock.version()
cert = ssock.getpeercert()
if cert:
tls_data['certificate_valid'] = True
tls_data['issuer'] = dict(x[0] for x in cert.get('issuer', []))
tls_data['subject'] = dict(x[0] for x in cert.get('subject', []))
tls_data['expires'] = cert.get('notAfter')
# Check for weak protocols
if ssock.version() in ('SSLv2', 'SSLv3', 'TLSv1', 'TLSv1.1'):
issues.append(self._create_issue(
category='tls',
severity='high',
title=f'Weak TLS version: {ssock.version()}',
description='The server supports outdated TLS versions with known vulnerabilities.',
remediation='Disable TLS 1.0 and 1.1. Use TLS 1.2 or higher.'
))
except ssl.SSLCertVerificationError as e:
tls_data['certificate_valid'] = False
issues.append(self._create_issue(
category='tls',
severity='critical',
title='Invalid TLS certificate',
description=f'The TLS certificate failed validation: {e}',
remediation='Obtain and install a valid TLS certificate from a trusted CA.'
))
except ssl.SSLError as e:
tls_data['has_tls'] = False
issues.append(self._create_issue(
category='tls',
severity='high',
title='TLS connection error',
description=f'Could not establish TLS connection: {e}',
remediation='Check TLS configuration and certificate installation.'
))
except (socket.timeout, socket.error) as e:
# Network error, not a TLS issue
pass
return issues, tls_data
def _count_present_headers(self, headers: dict) -> int:
"""Count how many security headers are present."""
headers_lower = {k.lower(): v for k, v in headers.items()}
count = 0
for header_name in self.SECURITY_HEADERS:
if header_name in headers_lower:
count += 1
return count