Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 60 additions & 59 deletions xcompare.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#!/usr/bin/env python3
"""
Find mutual followers between two X (Twitter) accounts without the official API.

Built and maintained by OffSeq (https://offseq.com/ | https://x.com/offseq |
https://www.linkedin.com/company/offseq | https://infosec.exchange/@offseq).
Built and maintained by OffSeq (https://offseq.com/).
Modified to use curl_cffi to bypass Cloudflare TLS fingerprinting.
"""

from __future__ import annotations
Expand All @@ -13,15 +12,20 @@
import os
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Tuple

try:
from curl_cffi import requests as cffi_requests
except ImportError:
print("Error: curl_cffi not installed. Run: pip install curl_cffi", file=sys.stderr)
sys.exit(1)

PUBLIC_WEB_BEARER = urllib.parse.unquote(
"AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA"
)

GUEST_ACTIVATE_URL = "https://x.com/i/api/1.1/guest/activate.json"
USER_SHOW_URL = "https://x.com/i/api/1.1/users/show.json"
USER_BY_USERNAME_V2 = "https://x.com/i/api/2/users/by/username/{username}"
Expand Down Expand Up @@ -60,7 +64,7 @@ def parse_args() -> argparse.Namespace:
"--delay",
type=float,
default=1.0,
help="Seconds to wait between paginated API calls to stay under rate limits (default: 1).",
help="Seconds to wait between paginated API calls (default: 1).",
)
parser.add_argument(
"--json",
Expand All @@ -80,7 +84,7 @@ def parse_args() -> argparse.Namespace:
return parser.parse_args()


def read_cookie_source(args: argparse.Namespace) -> Optional[CookieBundle]:
def read_cookie_source(args: argparse.Namespace) -> Optional["CookieBundle"]:
raw_cookie: Optional[str] = None
if args.cookie:
raw_cookie = args.cookie
Expand All @@ -93,6 +97,7 @@ def read_cookie_source(args: argparse.Namespace) -> Optional[CookieBundle]:

if raw_cookie is None:
return None

normalized_cookie, bearer = normalize_cookie_blob(raw_cookie)
if not normalized_cookie:
raise XCompareError("Cookie input was empty; check the provided --cookie/--cookie-file.")
Expand All @@ -110,24 +115,25 @@ def parse_cookie_string(raw_cookie: str) -> Dict[str, str]:


def resolve_bearer_token(
args: argparse.Namespace, bundle: Optional[CookieBundle]
args: argparse.Namespace, bundle: Optional["CookieBundle"]
) -> str:
if args.web_bearer:
manual = _sanitize_bearer(args.web_bearer)
if not manual:
raise XCompareError("Provided --web-bearer value was empty.")
return manual

env_token = _sanitize_bearer(os.getenv("X_WEB_BEARER_TOKEN"))
if env_token:
return env_token

if bundle and bundle.bearer:
return bundle.bearer

return PUBLIC_WEB_BEARER


def normalize_cookie_blob(blob: str) -> Tuple[str, Optional[str]]:
"""Accept either a raw Cookie header or a JSON export from DevTools."""

stripped = blob.strip()
if not stripped:
return "", None
Expand All @@ -142,6 +148,7 @@ def normalize_cookie_blob(blob: str) -> Tuple[str, Optional[str]]:
cookie_value = cookie_value or ""
bearer_value = _sanitize_bearer(bearer_value)
return cookie_value, bearer_value

return stripped, None


Expand Down Expand Up @@ -186,63 +193,64 @@ class XWebSession:
cookie_map: Dict[str, str]
guest_token: Optional[str]
bearer_token: str
_session: object = None # curl_cffi session

def __post_init__(self):
# Create a persistent curl_cffi session impersonating Chrome
self._session = cffi_requests.Session(impersonate="chrome124")

@classmethod
def create(cls, raw_cookie: Optional[str], bearer_token: Optional[str]) -> "XWebSession":
token = bearer_token or PUBLIC_WEB_BEARER
cookie_map = parse_cookie_string(raw_cookie) if raw_cookie else {}
guest_token = None
if not raw_cookie:
guest_token = cls._activate_guest_token(token)
return cls(

instance = cls(
raw_cookie=raw_cookie,
cookie_map=cookie_map,
guest_token=guest_token,
bearer_token=token,
)

@staticmethod
def _activate_guest_token(bearer_token: str) -> str:
payload = json.dumps({"client": "web"}).encode()
if not raw_cookie:
instance.guest_token = instance._activate_guest_token(token)

return instance

def _activate_guest_token(self, bearer_token: str) -> str:
headers = {
"Authorization": f"Bearer {bearer_token}",
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": "Mozilla/5.0",
"Origin": "https://x.com",
"Referer": "https://x.com/",
"x-twitter-active-user": "yes",
"x-twitter-client-language": "en",
"x-twitter-auth-type": "OAuth2Session",
}
request = urllib.request.Request(
GUEST_ACTIVATE_URL,
data=payload,
headers=headers,
method="POST",
)

try:
with urllib.request.urlopen(request) as response:
payload = json.loads(response.read())
except urllib.error.HTTPError as exc:
body = exc.read().decode() if hasattr(exc, "read") else ""
response = self._session.post(
GUEST_ACTIVATE_URL,
json={"client": "web"},
headers=headers,
)
response.raise_for_status()
payload = response.json()
except Exception as exc:
raise XCompareError(
f"Guest activation failed ({exc.code} {exc.reason}"
+ (f": {body[:200].strip()}" if body else "")
+ "). X may be blocking anonymous sessions; pass --cookie with a logged-in browser session."
f"Guest activation failed: {exc}. "
"X may be blocking anonymous sessions; pass --cookie with a logged-in browser session."
) from exc
except urllib.error.URLError as exc:
raise XCompareError(f"Network error: {exc.reason}") from exc

token = payload.get("guest_token")
if not token:
raise XCompareError("Failed to obtain guest token from X.")
return token

def headers(self) -> Dict[str, str]:
headers = {
hdrs = {
"Authorization": f"Bearer {self.bearer_token}",
"User-Agent": "Mozilla/5.0",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US,en;q=0.9",
"Origin": "https://x.com",
Expand All @@ -252,25 +260,20 @@ def headers(self) -> Dict[str, str]:
"x-twitter-auth-type": "OAuth2Session",
}
if self.raw_cookie:
headers["Cookie"] = self.raw_cookie
hdrs["Cookie"] = self.raw_cookie
if "ct0" in self.cookie_map:
headers["x-csrf-token"] = self.cookie_map["ct0"]
hdrs["x-csrf-token"] = self.cookie_map["ct0"]
elif self.guest_token:
headers["x-guest-token"] = self.guest_token
return headers
hdrs["x-guest-token"] = self.guest_token
return hdrs

def get(self, url: str, params: Optional[Dict[str, str]] = None) -> Dict:
query = urllib.parse.urlencode(params or {})
full_url = f"{url}?{query}" if query else url
request = urllib.request.Request(full_url, headers=self.headers())
try:
with urllib.request.urlopen(request) as response:
return json.loads(response.read())
except urllib.error.HTTPError as exc:
body = exc.read().decode() if hasattr(exc, "read") else ""
raise XCompareError(f"Request failed: {exc.code} {exc.reason}: {body}") from exc
except urllib.error.URLError as exc:
raise XCompareError(f"Network error: {exc.reason}") from exc
response = self._session.get(url, params=params or {}, headers=self.headers())
response.raise_for_status()
return response.json()
except Exception as exc:
raise XCompareError(f"Request failed: {exc}") from exc


def fetch_user(session: XWebSession, username: str) -> Dict:
Expand Down Expand Up @@ -304,9 +307,7 @@ def _fetch_user_v1(session: XWebSession, username: str) -> Dict:

def _fetch_user_v2(session: XWebSession, username: str) -> Dict:
url = USER_BY_USERNAME_V2.format(username=username)
params = {
"user.fields": "id,name,username,verified,protected,public_metrics",
}
params = {"user.fields": "id,name,username,verified,protected,public_metrics"}
payload = session.get(url, params=params)
data = payload.get("data")
if not data:
Expand All @@ -328,11 +329,7 @@ def _looks_like_missing_user(exc: XCompareError) -> bool:


def _fetch_user_typeahead(session: XWebSession, username: str) -> Dict:
params = {
"q": username,
"src": "search_box",
"result_type": "users",
}
params = {"q": username, "src": "search_box", "result_type": "users"}
payload = session.get("https://x.com/i/api/1.1/search/typeahead.json", params=params)
users = payload.get("users", [])
username_lower = username.lower()
Expand All @@ -341,7 +338,6 @@ def _fetch_user_typeahead(session: XWebSession, username: str) -> Dict:
if handle == username_lower:
return user
if users:
# fall back to first if exact match missing
return users[0]
raise XCompareError(f"Unable to resolve @{username} via typeahead")

Expand All @@ -366,20 +362,23 @@ def fetch_followers(
}
payload = session.get(FOLLOWERS_LIST_URL, params=params)
users = payload.get("users", [])

for follower in users:
collected[follower["id_str"]] = follower
if len(collected) >= limit:
break

next_cursor = str(payload.get("next_cursor_str") or payload.get("next_cursor") or "0")
cursor = next_cursor

log(
f"Fetched {len(collected)} follower(s) so far for user {user_id}"
+ ("" if cursor in ("0", "0.0") else " (more available)")
)

if cursor in ("0", "0.0") or not users:
break

if delay > 0:
time.sleep(delay)

Expand Down Expand Up @@ -428,7 +427,6 @@ def print_table(users: Iterable[Dict]) -> None:
if not users:
print("No overlapping followers found.")
return

print(f"Found {len(users)} overlapping follower(s):\n")
for user in users:
print(f" • {describe_user(user)}")
Expand All @@ -438,7 +436,6 @@ def build_logger(quiet: bool):
def log(message: str) -> None:
if not quiet:
print(message, file=sys.stderr)

return log


Expand All @@ -450,9 +447,12 @@ def main() -> None:
cookie_bundle = read_cookie_source(args)
bearer_token = resolve_bearer_token(args, cookie_bundle)
cookie_value = cookie_bundle.cookie if cookie_bundle else None

session = XWebSession.create(cookie_value, bearer_token=bearer_token)

user_a = fetch_user(session, args.user_a)
user_b = fetch_user(session, args.user_b)

log(f"Resolved @{args.user_a} -> {user_a['id_str']}")
log(f"Resolved @{args.user_b} -> {user_b['id_str']}")

Expand All @@ -472,6 +472,7 @@ def main() -> None:
)

overlaps = overlapping_followers(followers_a, followers_b)

except XCompareError as exc:
print(f"Error: {exc}", file=sys.stderr)
sys.exit(1)
Expand Down