secure-web/backend/scanner/scanners/tls.py

381 lines
16 KiB
Python

"""
TLS/SSL Security Scanner.
This module checks TLS/SSL configuration and certificate validity.
"""
import logging
import socket
import ssl
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from urllib.parse import urlparse
from .base import (
BaseScanner,
ScannerResult,
ScannerStatus,
IssueData,
MetricData,
)
logger = logging.getLogger(__name__)
class TLSScanner(BaseScanner):
"""
Scanner for TLS/SSL certificate and configuration.
Checks:
- Certificate validity
- Certificate expiration
- HTTPS availability
- HTTP to HTTPS redirect
"""
name = "tls_check"
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
self.timeout = self.config.get('timeout', 10)
def run(self, url: str) -> ScannerResult:
"""
Run TLS/SSL analysis on the URL.
Args:
url: The URL to analyze
Returns:
ScannerResult with TLS findings
"""
self.logger.info(f"Starting TLS scan for {url}")
try:
parsed = urlparse(url)
hostname = parsed.netloc.split(':')[0]
port = parsed.port or (443 if parsed.scheme == 'https' else 80)
issues = []
metrics = []
raw_data = {}
# Check if site is HTTPS
if parsed.scheme == 'http':
# Check if HTTPS is available
https_available, https_result = self._check_https_available(hostname)
raw_data['https_available'] = https_available
raw_data['https_check'] = https_result
if https_available:
issues.append(IssueData(
category='tls',
severity='high',
title='Site accessed over HTTP but HTTPS is available',
description=(
'The site was accessed over unencrypted HTTP, but HTTPS '
'appears to be available. All traffic should use HTTPS.'
),
tool='tls_check',
affected_url=url,
remediation=(
'Redirect all HTTP traffic to HTTPS using a 301 redirect. '
'Implement HSTS to prevent future HTTP access.'
)
))
else:
issues.append(IssueData(
category='tls',
severity='critical',
title='Site does not support HTTPS',
description=(
'The site does not appear to have HTTPS configured. '
'All data transmitted is unencrypted and vulnerable to interception.'
),
tool='tls_check',
affected_url=url,
remediation=(
'Configure TLS/SSL for your server. Obtain a certificate from '
"Let's Encrypt (free) or a commercial CA."
)
))
metrics.append(MetricData(
name='tls_enabled',
display_name='TLS Enabled',
value=0.0,
unit='score',
source='tls_check'
))
return ScannerResult(
scanner_name=self.name,
status=ScannerStatus.SUCCESS,
issues=issues,
metrics=metrics,
raw_data=raw_data
)
# For HTTPS URLs, check certificate
cert_info = self._get_certificate_info(hostname, port)
raw_data['certificate'] = cert_info
if cert_info.get('error'):
issues.append(IssueData(
category='tls',
severity='critical',
title='Certificate validation failed',
description=f"SSL certificate error: {cert_info['error']}",
tool='tls_check',
affected_url=url,
remediation=(
'Ensure your SSL certificate is valid, not expired, '
'and properly configured for your domain.'
)
))
metrics.append(MetricData(
name='certificate_valid',
display_name='Certificate Valid',
value=0.0,
unit='score',
source='tls_check'
))
else:
# Certificate is valid
metrics.append(MetricData(
name='certificate_valid',
display_name='Certificate Valid',
value=1.0,
unit='score',
source='tls_check'
))
metrics.append(MetricData(
name='tls_enabled',
display_name='TLS Enabled',
value=1.0,
unit='score',
source='tls_check'
))
# Check expiration
if cert_info.get('expires'):
try:
expires = datetime.strptime(
cert_info['expires'],
'%b %d %H:%M:%S %Y %Z'
)
expires = expires.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
days_until_expiry = (expires - now).days
metrics.append(MetricData(
name='certificate_days_until_expiry',
display_name='Days Until Certificate Expiry',
value=float(days_until_expiry),
unit='count',
source='tls_check'
))
if days_until_expiry <= 0:
issues.append(IssueData(
category='tls',
severity='critical',
title='SSL certificate has expired',
description=(
f"The SSL certificate expired on {cert_info['expires']}. "
"Users will see security warnings."
),
tool='tls_check',
affected_url=url,
remediation='Renew your SSL certificate immediately.'
))
elif days_until_expiry <= 7:
issues.append(IssueData(
category='tls',
severity='high',
title='SSL certificate expiring very soon',
description=(
f"The SSL certificate will expire in {days_until_expiry} days "
f"(on {cert_info['expires']}). Renew immediately."
),
tool='tls_check',
affected_url=url,
remediation='Renew your SSL certificate before it expires.'
))
elif days_until_expiry <= 30:
issues.append(IssueData(
category='tls',
severity='medium',
title='SSL certificate expiring soon',
description=(
f"The SSL certificate will expire in {days_until_expiry} days "
f"(on {cert_info['expires']}). Plan for renewal."
),
tool='tls_check',
affected_url=url,
remediation=(
'Renew your SSL certificate before expiration. '
"Consider using auto-renewal with Let's Encrypt."
)
))
except Exception as e:
self.logger.warning(f"Could not parse certificate expiry: {e}")
# Check certificate subject matches hostname
if cert_info.get('subject'):
subject_cn = dict(x[0] for x in cert_info['subject']).get('commonName', '')
san = cert_info.get('subjectAltName', [])
san_names = [name for type_, name in san if type_ == 'DNS']
hostname_matched = self._hostname_matches_cert(
hostname, subject_cn, san_names
)
if not hostname_matched:
issues.append(IssueData(
category='tls',
severity='high',
title='Certificate hostname mismatch',
description=(
f"The SSL certificate is for '{subject_cn}' but "
f"the site is accessed as '{hostname}'."
),
tool='tls_check',
affected_url=url,
remediation=(
'Obtain a certificate that includes your domain name, '
'or add it to the Subject Alternative Names (SAN).'
)
))
# Check for HTTP to HTTPS redirect
if parsed.scheme == 'https':
redirect_info = self._check_http_redirect(hostname)
raw_data['http_redirect'] = redirect_info
if not redirect_info.get('redirects_to_https'):
issues.append(IssueData(
category='tls',
severity='medium',
title='No HTTP to HTTPS redirect',
description=(
'The site does not redirect HTTP requests to HTTPS. '
'Users accessing via HTTP will use an insecure connection.'
),
tool='tls_check',
affected_url=f"http://{hostname}",
remediation=(
'Configure your server to redirect all HTTP (port 80) '
'requests to HTTPS (port 443) with a 301 redirect.'
)
))
self.logger.info(f"TLS scan complete: {len(issues)} issues")
return ScannerResult(
scanner_name=self.name,
status=ScannerStatus.SUCCESS,
issues=issues,
metrics=metrics,
raw_data=raw_data
)
except Exception as e:
return self._create_error_result(e)
def _check_https_available(self, hostname: str) -> tuple:
"""Check if HTTPS is available for the hostname."""
try:
context = ssl.create_default_context()
with socket.create_connection((hostname, 443), timeout=self.timeout) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
return True, {'available': True, 'protocol': ssock.version()}
except ssl.SSLError as e:
return True, {'available': True, 'error': str(e)}
except Exception as e:
return False, {'available': False, 'error': str(e)}
def _get_certificate_info(self, hostname: str, port: int = 443) -> Dict:
"""Get SSL certificate information."""
try:
context = ssl.create_default_context()
with socket.create_connection((hostname, port), timeout=self.timeout) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
return {
'subject': cert.get('subject'),
'issuer': cert.get('issuer'),
'version': cert.get('version'),
'serialNumber': cert.get('serialNumber'),
'notBefore': cert.get('notBefore'),
'expires': cert.get('notAfter'),
'subjectAltName': cert.get('subjectAltName', []),
'protocol': ssock.version(),
'cipher': ssock.cipher(),
}
except ssl.SSLCertVerificationError as e:
return {'error': f"Certificate verification failed: {e.verify_message}"}
except ssl.SSLError as e:
return {'error': f"SSL error: {str(e)}"}
except socket.timeout:
return {'error': "Connection timed out"}
except Exception as e:
return {'error': str(e)}
def _hostname_matches_cert(
self,
hostname: str,
cn: str,
san_names: list
) -> bool:
"""Check if hostname matches certificate CN or SAN."""
all_names = [cn] + san_names
for name in all_names:
if name == hostname:
return True
# Handle wildcard certificates
if name.startswith('*.'):
domain = name[2:]
if hostname.endswith(domain):
# Ensure wildcard only matches one level
prefix = hostname[:-len(domain)-1]
if '.' not in prefix:
return True
return False
def _check_http_redirect(self, hostname: str) -> Dict:
"""Check if HTTP redirects to HTTPS."""
import httpx
try:
with httpx.Client(
timeout=self.timeout,
follow_redirects=False
) as client:
response = client.get(f"http://{hostname}")
if response.status_code in (301, 302, 303, 307, 308):
location = response.headers.get('location', '')
redirects_to_https = location.startswith('https://')
return {
'redirects_to_https': redirects_to_https,
'status_code': response.status_code,
'location': location,
}
else:
return {
'redirects_to_https': False,
'status_code': response.status_code,
}
except Exception as e:
return {
'redirects_to_https': False,
'error': str(e),
}