#!/usr/bin/env python3 """Sync App Store Connect subscriptions from Paywall.storekit.""" from __future__ import annotations import json import sys import time from pathlib import Path import jwt import requests API_BASE = "https://api.appstoreconnect.apple.com/v1" ROOT = Path(__file__).resolve().parents[1] REPO_ROOT = ROOT.parent ENV_PATH = ROOT / ".env" STOREKIT_PATH = REPO_ROOT / "App for Indeed" / "Paywall.storekit" MANIFEST_PATH = ROOT / "subscriptions.manifest.json" BASE_TERRITORY = "USA" LOCALE = "en-US" def load_env(path: Path) -> dict[str, str]: values: dict[str, str] = {} for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) values[key.strip()] = value.strip() return values def storekit_locale_to_asc(locale: str) -> str: return locale.replace("_", "-") def period_from_iso8601(value: str) -> str: mapping = { "P1W": "ONE_WEEK", "P1M": "ONE_MONTH", "P1Y": "ONE_YEAR", } if value not in mapping: raise ValueError(f"Unsupported subscription period {value!r}") return mapping[value] class ASCClient: def __init__(self, issuer_id: str, key_id: str, private_key_path: str): self.issuer_id = issuer_id self.key_id = key_id self.private_key = Path(private_key_path).expanduser().read_text(encoding="utf-8") self.session = requests.Session() self._token: str | None = None self._token_exp = 0.0 def _ensure_token(self) -> str: now = time.time() if not self._token or now >= self._token_exp - 60: issued_at = int(now) payload = { "iss": self.issuer_id, "iat": issued_at, "exp": issued_at + 1200, "aud": "appstoreconnect-v1", } self._token = jwt.encode( payload, self.private_key, algorithm="ES256", headers={"kid": self.key_id, "typ": "JWT"}, ) self._token_exp = issued_at + 1200 return self._token def request(self, method: str, path: str, **kwargs) -> requests.Response: headers = kwargs.pop("headers", {}) headers["Authorization"] = f"Bearer {self._ensure_token()}" headers.setdefault("Content-Type", "application/json") url = path if path.startswith("http") else f"{API_BASE}{path}" return self.session.request(method, url, headers=headers, timeout=90, **kwargs) def get_json(self, path: str, params: dict | None = None) -> dict: response = self.request("GET", path, params=params) if not response.ok: raise RuntimeError(f"GET {path} failed ({response.status_code}): {response.text}") return response.json() def post(self, resource_type: str, attributes: dict, relationships: dict) -> dict: body = { "data": { "type": resource_type, "attributes": attributes, "relationships": relationships, } } collection = resource_type response = self.request("POST", f"/{collection}", json=body) if not response.ok: raise RuntimeError( f"POST {resource_type} failed ({response.status_code}): {response.text}" ) return response.json()["data"] def find_app(client: ASCClient, bundle_id: str) -> dict: data = client.get_json("/apps", params={"filter[bundleId]": bundle_id, "limit": 1}) apps = data.get("data", []) if not apps: raise RuntimeError(f"No app found for bundle id {bundle_id!r}") return apps[0] def load_storekit(path: Path) -> dict: return json.loads(path.read_text(encoding="utf-8")) def list_subscription_groups(client: ASCClient, app_id: str) -> list[dict]: data = client.get_json( f"/apps/{app_id}/subscriptionGroups", params={"include": "subscriptions", "limit": 50}, ) groups = data.get("data", []) included = {item["id"]: item for item in data.get("included", []) if item["type"] == "subscriptions"} for group in groups: rel_ids = [ item["id"] for item in group.get("relationships", {}) .get("subscriptions", {}) .get("data", []) ] group["_subscriptions"] = [included[i] for i in rel_ids if i in included] return groups def find_or_create_group(client: ASCClient, app_id: str, reference_name: str) -> dict: for group in list_subscription_groups(client, app_id): if group.get("attributes", {}).get("referenceName") == reference_name: return group created = client.post( "subscriptionGroups", {"referenceName": reference_name}, {"app": {"data": {"type": "apps", "id": app_id}}}, ) created["_subscriptions"] = [] return created def ensure_group_localization( client: ASCClient, group_id: str, group_name: str, app_display_name: str ) -> None: data = client.get_json(f"/subscriptionGroups/{group_id}/subscriptionGroupLocalizations") for loc in data.get("data", []): if loc.get("attributes", {}).get("locale") == LOCALE: return client.post( "subscriptionGroupLocalizations", {"name": group_name, "locale": LOCALE, "customAppName": app_display_name}, {"subscriptionGroup": {"data": {"type": "subscriptionGroups", "id": group_id}}}, ) def find_subscription_by_product_id(subscriptions: list[dict], product_id: str) -> dict | None: for sub in subscriptions: if sub.get("attributes", {}).get("productId") == product_id: return sub return None def create_subscription( client: ASCClient, group_id: str, spec: dict, group_number: int ) -> dict: loc = spec["localizations"][0] return client.post( "subscriptions", { "name": spec["referenceName"], "productId": spec["productID"], "subscriptionPeriod": period_from_iso8601(spec["recurringSubscriptionPeriod"]), "groupLevel": group_number, }, {"group": {"data": {"type": "subscriptionGroups", "id": group_id}}}, ) def ensure_subscription_localization(client: ASCClient, sub_id: str, spec: dict) -> None: loc = spec["localizations"][0] asc_locale = storekit_locale_to_asc(loc["locale"]) data = client.get_json(f"/subscriptions/{sub_id}/subscriptionLocalizations") for item in data.get("data", []): if item.get("attributes", {}).get("locale") == asc_locale: return client.post( "subscriptionLocalizations", { "name": loc["displayName"], "description": loc["description"], "locale": asc_locale, }, {"subscription": {"data": {"type": "subscriptions", "id": sub_id}}}, ) def ensure_availability(client: ASCClient, sub_id: str) -> None: try: client.get_json(f"/subscriptionAvailabilities/{sub_id}") return except RuntimeError: pass client.post( "subscriptionAvailabilities", {"availableInNewTerritories": True}, { "subscription": {"data": {"type": "subscriptions", "id": sub_id}}, "availableTerritories": {"data": [{"type": "territories", "id": BASE_TERRITORY}]}, }, ) def price_point_for_amount(client: ASCClient, sub_id: str, amount: str) -> str: path = f"/subscriptions/{sub_id}/pricePoints?filter[territory]={BASE_TERRITORY}&limit=200" while path: data = client.get_json(path) for point in data.get("data", []): if point.get("attributes", {}).get("customerPrice") == amount: return point["id"] next_url = data.get("links", {}).get("next") if not next_url: break path = next_url.replace(API_BASE, "") raise RuntimeError(f"No {BASE_TERRITORY} price point for ${amount} on subscription {sub_id}") def ensure_price(client: ASCClient, sub_id: str, amount: str) -> None: prices = client.get_json(f"/subscriptions/{sub_id}/prices") if prices.get("meta", {}).get("paging", {}).get("total", 0) > 0: return price_point_id = price_point_for_amount(client, sub_id, amount) client.post( "subscriptionPrices", {}, { "subscription": {"data": {"type": "subscriptions", "id": sub_id}}, "subscriptionPricePoint": { "data": {"type": "subscriptionPricePoints", "id": price_point_id} }, }, ) def has_intro_offer(client: ASCClient, sub_id: str) -> bool: data = client.get_json(f"/subscriptions/{sub_id}/introductoryOffers") return data.get("meta", {}).get("paging", {}).get("total", 0) > 0 def ensure_free_trial(client: ASCClient, sub_id: str, days: int) -> None: if has_intro_offer(client, sub_id): return duration = {3: "THREE_DAYS"}.get(days) if duration is None: raise ValueError(f"Unsupported free-trial length: {days} days") client.post( "subscriptionIntroductoryOffers", {"offerMode": "FREE_TRIAL", "duration": duration, "numberOfPeriods": 1}, { "subscription": {"data": {"type": "subscriptions", "id": sub_id}}, "territory": {"data": {"type": "territories", "id": BASE_TERRITORY}}, }, ) def intro_days_from_storekit(offer: dict | None) -> int | None: if not offer: return None if offer.get("paymentMode") != "free": return None period = offer.get("subscriptionPeriod", "") if period == "P3D": return 3 return None def main() -> int: if not ENV_PATH.exists(): print(f"Missing {ENV_PATH}", file=sys.stderr) return 1 if not STOREKIT_PATH.exists(): print(f"Missing {STOREKIT_PATH}", file=sys.stderr) return 1 env = load_env(ENV_PATH) issuer = env.get("APP_STORE_CONNECT_ISSUER_ID", "") key_id = env.get("APP_STORE_CONNECT_KEY_ID", "") key_path = env.get("APP_STORE_CONNECT_PRIVATE_KEY_PATH", "") bundle_id = env.get("APP_STORE_CONNECT_BUNDLE_ID", "") if not all([issuer, key_id, key_path, bundle_id]): print("Fill App Store Connect credentials in app-connect/.env", file=sys.stderr) return 1 storekit = load_storekit(STOREKIT_PATH) groups = storekit.get("subscriptionGroups", []) if len(groups) != 1: print("Expected exactly one subscription group in Paywall.storekit", file=sys.stderr) return 1 sk_group = groups[0] client = ASCClient(issuer, key_id, key_path) app = find_app(client, bundle_id) app_id = app["id"] print(f"App: {app['attributes'].get('name')} ({app_id})") asc_group = find_or_create_group(client, app_id, sk_group["name"]) group_id = asc_group["id"] print(f"Subscription group: {sk_group['name']} ({group_id})") ensure_group_localization(client, group_id, sk_group["name"], "App for Indeed") manifest: dict = { "subscriptionGroupId": group_id, "subscriptionGroupName": sk_group["name"], "products": [], } existing = list_subscription_groups(client, app_id) current_subs: list[dict] = [] for group in existing: if group["id"] == group_id: current_subs = group.get("_subscriptions", []) break for spec in sorted(sk_group["subscriptions"], key=lambda s: s["groupNumber"]): product_id = spec["productID"] sub = find_subscription_by_product_id(current_subs, product_id) if sub is None: sub = create_subscription(client, group_id, spec, spec["groupNumber"]) print(f"Created subscription {product_id} ({sub['id']})") current_subs.append(sub) else: print(f"Found subscription {product_id} ({sub['id']})") sub_id = sub["id"] ensure_subscription_localization(client, sub_id, spec) ensure_availability(client, sub_id) ensure_price(client, sub_id, spec["displayPrice"]) trial_days = intro_days_from_storekit(spec.get("introductoryOffer")) if trial_days: ensure_free_trial(client, sub_id, trial_days) print(f" Free trial: {trial_days} days") state = sub.get("attributes", {}).get("state") refreshed = client.get_json(f"/subscriptions/{sub_id}") state = refreshed["data"]["attributes"].get("state", state) manifest["products"].append( { "productId": product_id, "subscriptionId": sub_id, "referenceName": spec["referenceName"], "displayPrice": spec["displayPrice"], "period": spec["recurringSubscriptionPeriod"], "state": state, } ) print(f" State: {state}") MANIFEST_PATH.write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8") print(f"Wrote manifest: {MANIFEST_PATH}") print("Done. Review subscriptions in App Store Connect and submit for review when ready.") return 0 if __name__ == "__main__": raise SystemExit(main())