#!/usr/bin/env python3 """ find-reality.py — Find usable REALITY destinations from RealiTLScanner output. Usage: ./find-reality.py sermon.csv [--out results.csv] [--workers 20] Input CSV (RealiTLScanner format): IP,ORIGIN,CERT_DOMAIN,CERT_ISSUER,GEO_CODE Pipeline: 1. Drop AWS-internal / fake / placeholder cert domains. 2. Resolve A records — drop CNAMEs, NXDOMAIN, IP mismatches. 3. Probe HTTPS over TLS 1.3 — capture HTTP code, body size, title, server. 4. Classify as PASS / REVIEW / FAIL via heuristics + content blocklist. Edit the lists below to tune filtering for new patterns you discover. """ import argparse import csv import re import subprocess import sys from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from typing import List, Optional, Tuple # ============================================================ # Configurable lists # ============================================================ # Cert domains we never bother probing. # Keep this list to GENERIC, well-known patterns (AWS official suffixes, fake/default certs, # IP-as-domain). Don't add specific SaaS or DDNS providers you happened to see in a scan — # CNAME and IP-mismatch filters in resolve() already catch multi-tenant SaaS and CDN fronts, # and listing specific vendors here leaks information about your scan range. CERT_DOMAIN_BLOCK = [ # AWS r'\.amazonaws\.com$', r'\.aws\.dev$', r'\.amazonlightsail\.com$', r'\.sagemaker\.aws$', r'\.aws\.a2z\.com$', r'\.amazon\.dev$', r'\.execute-api\.', # Azure r'\.azurewebsites\.net$', r'\.cloudapp\.azure\.com$', r'\.cloudapp\.net$', r'\.azure-api\.net$', r'\.azureedge\.net$', r'\.azurefd\.net$', r'\.trafficmanager\.net$', r'\.azurecontainer\.io$', # GCP r'\.appspot\.com$', r'\.run\.app$', r'\.cloudfunctions\.net$', r'\.googleapis\.com$', r'\.googleusercontent\.com$', r'\.firebaseapp\.com$', r'\.web\.app$', # Cloudflare PaaS r'\.workers\.dev$', r'\.pages\.dev$', # Mainstream PaaS r'\.herokuapp\.com$', r'\.vercel\.app$', r'\.netlify\.app$', r'\.fly\.dev$', r'\.onrender\.com$', # Alibaba / Tencent (relevant for APAC scans) r'\.aliyuncs\.com$', r'\.myqcloud\.com$', # Generic placeholders / fakes r'^Cloudflare$', r'Fake Certificate', r'^localhost$', r'^\d+\.\d+\.\d+\.\d+$', ] # Title / page-content keywords that flag a candidate as REVIEW (not auto-PASS). # Generic categories only — sensitive content, default/parked pages. TITLE_BLOCK_KEYWORDS = [ # Gambling / casino 'casino', 'lottery', 'slot', 'poker', 'sportsbook', 'taruhan', 'judi', '博彩', '赌', '彩票', # Adult / chat 'video chat', 'porn', 'adult', 'sex chat', # Default / placeholder pages 'welcome to nginx', 'apache2 ubuntu default', 'site is created successfully', 'test page for the apache', 'it works!', # Maintenance / parked / dead 'website has been stopped', 'this domain is for sale', 'parked', 'sorry, the website', ] # Cert-domain substrings hinting at gambling. Keep generic. DOMAIN_HINT_BLOCK = [ '777', '888', '999', 'casino', 'sportsbook', 'judi', 'taruhan', 'lottery', ] # Servers that mean we hit a load balancer / mesh, not the real backend. SERVER_BLOCK_KEYWORDS = [ 'awselb', 'istio-envoy', ] # Defaults DIG_TIMEOUT = 3 CURL_TIMEOUT = 8 MIN_BODY_SIZE = 1024 UA = ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36') # ============================================================ # Data # ============================================================ @dataclass class Candidate: ip: str cert_domain: str probe_domain: str issuer: str = '' geo: str = '' dns_status: str = '' dns_ips: List[str] = field(default_factory=list) http_code: int = 0 http_size: int = 0 server: str = '' title: str = '' classification: str = '' notes: str = '' # ============================================================ # Stage 1 — filter # ============================================================ def is_blocked_cert(domain: str) -> bool: return any(re.search(p, domain, re.IGNORECASE) for p in CERT_DOMAIN_BLOCK) def normalize(domain: str) -> str: return domain[2:] if domain.startswith('*.') else domain def load_candidates(path: str) -> List[Candidate]: out: List[Candidate] = [] seen = set() with open(path, newline='') as f: reader = csv.reader(f) next(reader, None) for row in reader: if len(row) < 5: continue ip, _, cert, issuer, geo = row[0], row[1], row[2], row[3], row[4] if is_blocked_cert(cert): continue probe = normalize(cert) if is_blocked_cert(probe): continue key = (ip, probe) if key in seen: continue seen.add(key) out.append(Candidate(ip=ip, cert_domain=cert, probe_domain=probe, issuer=issuer, geo=geo)) return out # ============================================================ # Stage 2 — DNS # ============================================================ def dig_a(domain: str) -> Tuple[bool, List[str]]: """Return (has_cname, list_of_a_record_ips).""" try: r = subprocess.run( ['dig', '+noall', '+answer', f'+time={DIG_TIMEOUT}', '+tries=1', domain, 'A'], capture_output=True, text=True, timeout=DIG_TIMEOUT + 2, ) except (subprocess.TimeoutExpired, FileNotFoundError): return False, [] ans = r.stdout.strip() if not ans: return False, [] has_cname = False ips: List[str] = [] for line in ans.splitlines(): parts = line.split() if len(parts) < 5: continue if parts[3] == 'CNAME': has_cname = True elif parts[3] == 'A': ips.append(parts[4]) return has_cname, ips def resolve(c: Candidate) -> Candidate: has_cname, ips = dig_a(c.probe_domain) if not ips and c.cert_domain.startswith('*.'): # wildcard cert — try www. variant has_cname2, ips2 = dig_a(f'www.{c.probe_domain}') if ips2: c.probe_domain = f'www.{c.probe_domain}' has_cname = has_cname2 ips = ips2 c.dns_ips = ips if not ips: c.dns_status = 'NXDOMAIN' elif has_cname: c.dns_status = 'CNAME' elif c.ip in ips: c.dns_status = 'OK' else: c.dns_status = 'MISMATCH' return c # ============================================================ # Stage 3 — HTTPS probe (TLS 1.3 only) # ============================================================ TITLE_RE = re.compile(r']*>([^<]*)', re.IGNORECASE) STATUS_RE = re.compile(r'\[\[STATUS\]\](\d+)\|(\d+)\s*$') def probe(c: Candidate) -> Candidate: if c.dns_status != 'OK': return c url = f'https://{c.probe_domain}/' try: r = subprocess.run( ['curl', '-sS', '--max-time', str(CURL_TIMEOUT), '--tlsv1.3', '--tls-max', '1.3', '--connect-timeout', '5', '-A', UA, '-w', '\n[[STATUS]]%{http_code}|%{size_download}', url], capture_output=True, text=True, timeout=CURL_TIMEOUT + 4, errors='replace', ) out = r.stdout m = STATUS_RE.search(out) if m: c.http_code = int(m.group(1)) c.http_size = int(m.group(2)) html = out[:m.start()] else: html = out tm = TITLE_RE.search(html) if tm: c.title = tm.group(1).strip()[:80] except subprocess.TimeoutExpired: c.notes = 'curl timeout' return c # Server header (separate HEAD request — fewer redirects, cheap) try: h = subprocess.run( ['curl', '-sSI', '--max-time', '5', '--connect-timeout', '3', '-A', UA, url], capture_output=True, text=True, timeout=8, errors='replace', ) for line in h.stdout.splitlines(): if line.lower().startswith('server:'): c.server = line.split(':', 1)[1].strip()[:40] break except subprocess.TimeoutExpired: pass return c # ============================================================ # Stage 4 — classify # ============================================================ def first_match(needles, haystack: str) -> Optional[str]: return next((n for n in needles if n in haystack), None) def classify(c: Candidate) -> Candidate: if c.dns_status != 'OK': c.classification = 'FAIL' c.notes = (c.notes + ' ' + c.dns_status).strip() return c if c.http_code == 0: c.classification = 'FAIL' c.notes = (c.notes + ' no_response').strip() return c title_lc = c.title.lower() server_lc = c.server.lower() domain_lc = c.probe_domain.lower() bad_kw = first_match(TITLE_BLOCK_KEYWORDS, title_lc) bad_srv = first_match(SERVER_BLOCK_KEYWORDS, server_lc) bad_dom = first_match(DOMAIN_HINT_BLOCK, domain_lc) if (c.http_code == 200 and c.http_size >= MIN_BODY_SIZE and not bad_kw and not bad_srv and not bad_dom): c.classification = 'PASS' elif c.http_code in (200, 301, 302, 307, 401, 403, 404): c.classification = 'REVIEW' if bad_kw: c.notes = f'kw:{bad_kw}' elif bad_dom: c.notes = f'dom:{bad_dom}' elif bad_srv: c.notes = f'server:{bad_srv}' elif c.http_size < MIN_BODY_SIZE: c.notes = 'small_body' else: c.notes = f'http_{c.http_code}' else: c.classification = 'FAIL' c.notes = f'http_{c.http_code}' return c # ============================================================ # Main # ============================================================ def main() -> int: ap = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) ap.add_argument('csv', help='RealiTLScanner output CSV') ap.add_argument('--out', default='', help='write full results CSV here') ap.add_argument('--workers', type=int, default=20) args = ap.parse_args() candidates = load_candidates(args.csv) print(f'[1/3] loaded {len(candidates)} candidates after cert-domain filter', file=sys.stderr) with ThreadPoolExecutor(max_workers=args.workers) as ex: candidates = list(ex.map(resolve, candidates)) ok = [c for c in candidates if c.dns_status == 'OK'] print(f'[2/3] DNS: {len(ok)} OK / {len(candidates)} total ' f'({sum(1 for c in candidates if c.dns_status == "CNAME")} CNAME, ' f'{sum(1 for c in candidates if c.dns_status == "MISMATCH")} mismatch, ' f'{sum(1 for c in candidates if c.dns_status == "NXDOMAIN")} nx)', file=sys.stderr) with ThreadPoolExecutor(max_workers=args.workers) as ex: list(ex.map(probe, ok)) for c in candidates: classify(c) pass_list = [c for c in candidates if c.classification == 'PASS'] review_list = [c for c in candidates if c.classification == 'REVIEW'] print(f'[3/3] {len(pass_list)} PASS / {len(review_list)} REVIEW / ' f'{len(candidates) - len(pass_list) - len(review_list)} FAIL', file=sys.stderr) print() print('=== PASS ===') print(f'{"IP":<16} {"DOMAIN":<45} TITLE') for c in sorted(pass_list, key=lambda x: x.ip): print(f'{c.ip:<16} {c.probe_domain:<45} {c.title[:60]}') if review_list: print() print('=== REVIEW (worth a glance) ===') for c in sorted(review_list, key=lambda x: x.ip): print(f'{c.ip:<16} {c.probe_domain:<40} ' f'http={c.http_code:<3} size={c.http_size:<6} ' f'note={c.notes:<14} title={c.title[:40]}') if args.out: with open(args.out, 'w', newline='') as f: w = csv.writer(f) w.writerow(['ip', 'cert_domain', 'probe_domain', 'dns_status', 'a_records', 'http_code', 'http_size', 'server', 'title', 'classification', 'notes']) for c in candidates: w.writerow([c.ip, c.cert_domain, c.probe_domain, c.dns_status, ','.join(c.dns_ips), c.http_code, c.http_size, c.server, c.title, c.classification, c.notes]) print(f'\nwrote full results to {args.out}', file=sys.stderr) return 0 if __name__ == '__main__': sys.exit(main())