| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- #!/usr/bin/env python3
- """Sync App Store Connect subscriptions from Paywall.storekit."""
- from __future__ import annotations
- import json
- import sys
- import time
- from pathlib import Path
- import jwt
- import requests
- API_BASE = "https://api.appstoreconnect.apple.com/v1"
- ROOT = Path(__file__).resolve().parents[1]
- REPO_ROOT = ROOT.parent
- ENV_PATH = ROOT / ".env"
- STOREKIT_PATH = REPO_ROOT / "App for Indeed" / "Paywall.storekit"
- MANIFEST_PATH = ROOT / "subscriptions.manifest.json"
- BASE_TERRITORY = "USA"
- LOCALE = "en-US"
- def load_env(path: Path) -> dict[str, str]:
- values: dict[str, str] = {}
- for line in path.read_text(encoding="utf-8").splitlines():
- line = line.strip()
- if not line or line.startswith("#") or "=" not in line:
- continue
- key, value = line.split("=", 1)
- values[key.strip()] = value.strip()
- return values
- def storekit_locale_to_asc(locale: str) -> str:
- return locale.replace("_", "-")
- def period_from_iso8601(value: str) -> str:
- mapping = {
- "P1W": "ONE_WEEK",
- "P1M": "ONE_MONTH",
- "P1Y": "ONE_YEAR",
- }
- if value not in mapping:
- raise ValueError(f"Unsupported subscription period {value!r}")
- return mapping[value]
- class ASCClient:
- def __init__(self, issuer_id: str, key_id: str, private_key_path: str):
- self.issuer_id = issuer_id
- self.key_id = key_id
- self.private_key = Path(private_key_path).expanduser().read_text(encoding="utf-8")
- self.session = requests.Session()
- self._token: str | None = None
- self._token_exp = 0.0
- def _ensure_token(self) -> str:
- now = time.time()
- if not self._token or now >= self._token_exp - 60:
- issued_at = int(now)
- payload = {
- "iss": self.issuer_id,
- "iat": issued_at,
- "exp": issued_at + 1200,
- "aud": "appstoreconnect-v1",
- }
- self._token = jwt.encode(
- payload,
- self.private_key,
- algorithm="ES256",
- headers={"kid": self.key_id, "typ": "JWT"},
- )
- self._token_exp = issued_at + 1200
- return self._token
- def request(self, method: str, path: str, **kwargs) -> requests.Response:
- headers = kwargs.pop("headers", {})
- headers["Authorization"] = f"Bearer {self._ensure_token()}"
- headers.setdefault("Content-Type", "application/json")
- url = path if path.startswith("http") else f"{API_BASE}{path}"
- return self.session.request(method, url, headers=headers, timeout=90, **kwargs)
- def get_json(self, path: str, params: dict | None = None) -> dict:
- response = self.request("GET", path, params=params)
- if not response.ok:
- raise RuntimeError(f"GET {path} failed ({response.status_code}): {response.text}")
- return response.json()
- def post(self, resource_type: str, attributes: dict, relationships: dict) -> dict:
- body = {
- "data": {
- "type": resource_type,
- "attributes": attributes,
- "relationships": relationships,
- }
- }
- collection = resource_type
- response = self.request("POST", f"/{collection}", json=body)
- if not response.ok:
- raise RuntimeError(
- f"POST {resource_type} failed ({response.status_code}): {response.text}"
- )
- return response.json()["data"]
- def find_app(client: ASCClient, bundle_id: str) -> dict:
- data = client.get_json("/apps", params={"filter[bundleId]": bundle_id, "limit": 1})
- apps = data.get("data", [])
- if not apps:
- raise RuntimeError(f"No app found for bundle id {bundle_id!r}")
- return apps[0]
- def load_storekit(path: Path) -> dict:
- return json.loads(path.read_text(encoding="utf-8"))
- def list_subscription_groups(client: ASCClient, app_id: str) -> list[dict]:
- data = client.get_json(
- f"/apps/{app_id}/subscriptionGroups",
- params={"include": "subscriptions", "limit": 50},
- )
- groups = data.get("data", [])
- included = {item["id"]: item for item in data.get("included", []) if item["type"] == "subscriptions"}
- for group in groups:
- rel_ids = [
- item["id"]
- for item in group.get("relationships", {})
- .get("subscriptions", {})
- .get("data", [])
- ]
- group["_subscriptions"] = [included[i] for i in rel_ids if i in included]
- return groups
- def find_or_create_group(client: ASCClient, app_id: str, reference_name: str) -> dict:
- for group in list_subscription_groups(client, app_id):
- if group.get("attributes", {}).get("referenceName") == reference_name:
- return group
- created = client.post(
- "subscriptionGroups",
- {"referenceName": reference_name},
- {"app": {"data": {"type": "apps", "id": app_id}}},
- )
- created["_subscriptions"] = []
- return created
- def ensure_group_localization(
- client: ASCClient, group_id: str, group_name: str, app_display_name: str
- ) -> None:
- data = client.get_json(f"/subscriptionGroups/{group_id}/subscriptionGroupLocalizations")
- for loc in data.get("data", []):
- if loc.get("attributes", {}).get("locale") == LOCALE:
- return
- client.post(
- "subscriptionGroupLocalizations",
- {"name": group_name, "locale": LOCALE, "customAppName": app_display_name},
- {"subscriptionGroup": {"data": {"type": "subscriptionGroups", "id": group_id}}},
- )
- def find_subscription_by_product_id(subscriptions: list[dict], product_id: str) -> dict | None:
- for sub in subscriptions:
- if sub.get("attributes", {}).get("productId") == product_id:
- return sub
- return None
- def create_subscription(
- client: ASCClient, group_id: str, spec: dict, group_number: int
- ) -> dict:
- loc = spec["localizations"][0]
- return client.post(
- "subscriptions",
- {
- "name": spec["referenceName"],
- "productId": spec["productID"],
- "subscriptionPeriod": period_from_iso8601(spec["recurringSubscriptionPeriod"]),
- "groupLevel": group_number,
- },
- {"group": {"data": {"type": "subscriptionGroups", "id": group_id}}},
- )
- def ensure_subscription_localization(client: ASCClient, sub_id: str, spec: dict) -> None:
- loc = spec["localizations"][0]
- asc_locale = storekit_locale_to_asc(loc["locale"])
- data = client.get_json(f"/subscriptions/{sub_id}/subscriptionLocalizations")
- for item in data.get("data", []):
- if item.get("attributes", {}).get("locale") == asc_locale:
- return
- client.post(
- "subscriptionLocalizations",
- {
- "name": loc["displayName"],
- "description": loc["description"],
- "locale": asc_locale,
- },
- {"subscription": {"data": {"type": "subscriptions", "id": sub_id}}},
- )
- def ensure_availability(client: ASCClient, sub_id: str) -> None:
- try:
- client.get_json(f"/subscriptionAvailabilities/{sub_id}")
- return
- except RuntimeError:
- pass
- client.post(
- "subscriptionAvailabilities",
- {"availableInNewTerritories": True},
- {
- "subscription": {"data": {"type": "subscriptions", "id": sub_id}},
- "availableTerritories": {"data": [{"type": "territories", "id": BASE_TERRITORY}]},
- },
- )
- def price_point_for_amount(client: ASCClient, sub_id: str, amount: str) -> str:
- path = f"/subscriptions/{sub_id}/pricePoints?filter[territory]={BASE_TERRITORY}&limit=200"
- while path:
- data = client.get_json(path)
- for point in data.get("data", []):
- if point.get("attributes", {}).get("customerPrice") == amount:
- return point["id"]
- next_url = data.get("links", {}).get("next")
- if not next_url:
- break
- path = next_url.replace(API_BASE, "")
- raise RuntimeError(f"No {BASE_TERRITORY} price point for ${amount} on subscription {sub_id}")
- def ensure_price(client: ASCClient, sub_id: str, amount: str) -> None:
- prices = client.get_json(f"/subscriptions/{sub_id}/prices")
- if prices.get("meta", {}).get("paging", {}).get("total", 0) > 0:
- return
- price_point_id = price_point_for_amount(client, sub_id, amount)
- client.post(
- "subscriptionPrices",
- {},
- {
- "subscription": {"data": {"type": "subscriptions", "id": sub_id}},
- "subscriptionPricePoint": {
- "data": {"type": "subscriptionPricePoints", "id": price_point_id}
- },
- },
- )
- def has_intro_offer(client: ASCClient, sub_id: str) -> bool:
- data = client.get_json(f"/subscriptions/{sub_id}/introductoryOffers")
- return data.get("meta", {}).get("paging", {}).get("total", 0) > 0
- def ensure_free_trial(client: ASCClient, sub_id: str, days: int) -> None:
- if has_intro_offer(client, sub_id):
- return
- duration = {3: "THREE_DAYS"}.get(days)
- if duration is None:
- raise ValueError(f"Unsupported free-trial length: {days} days")
- client.post(
- "subscriptionIntroductoryOffers",
- {"offerMode": "FREE_TRIAL", "duration": duration, "numberOfPeriods": 1},
- {
- "subscription": {"data": {"type": "subscriptions", "id": sub_id}},
- "territory": {"data": {"type": "territories", "id": BASE_TERRITORY}},
- },
- )
- def intro_days_from_storekit(offer: dict | None) -> int | None:
- if not offer:
- return None
- if offer.get("paymentMode") != "free":
- return None
- period = offer.get("subscriptionPeriod", "")
- if period == "P3D":
- return 3
- return None
- def main() -> int:
- if not ENV_PATH.exists():
- print(f"Missing {ENV_PATH}", file=sys.stderr)
- return 1
- if not STOREKIT_PATH.exists():
- print(f"Missing {STOREKIT_PATH}", file=sys.stderr)
- return 1
- env = load_env(ENV_PATH)
- issuer = env.get("APP_STORE_CONNECT_ISSUER_ID", "")
- key_id = env.get("APP_STORE_CONNECT_KEY_ID", "")
- key_path = env.get("APP_STORE_CONNECT_PRIVATE_KEY_PATH", "")
- bundle_id = env.get("APP_STORE_CONNECT_BUNDLE_ID", "")
- if not all([issuer, key_id, key_path, bundle_id]):
- print("Fill App Store Connect credentials in app-connect/.env", file=sys.stderr)
- return 1
- storekit = load_storekit(STOREKIT_PATH)
- groups = storekit.get("subscriptionGroups", [])
- if len(groups) != 1:
- print("Expected exactly one subscription group in Paywall.storekit", file=sys.stderr)
- return 1
- sk_group = groups[0]
- client = ASCClient(issuer, key_id, key_path)
- app = find_app(client, bundle_id)
- app_id = app["id"]
- print(f"App: {app['attributes'].get('name')} ({app_id})")
- asc_group = find_or_create_group(client, app_id, sk_group["name"])
- group_id = asc_group["id"]
- print(f"Subscription group: {sk_group['name']} ({group_id})")
- ensure_group_localization(client, group_id, sk_group["name"], "App for Indeed")
- manifest: dict = {
- "subscriptionGroupId": group_id,
- "subscriptionGroupName": sk_group["name"],
- "products": [],
- }
- existing = list_subscription_groups(client, app_id)
- current_subs: list[dict] = []
- for group in existing:
- if group["id"] == group_id:
- current_subs = group.get("_subscriptions", [])
- break
- for spec in sorted(sk_group["subscriptions"], key=lambda s: s["groupNumber"]):
- product_id = spec["productID"]
- sub = find_subscription_by_product_id(current_subs, product_id)
- if sub is None:
- sub = create_subscription(client, group_id, spec, spec["groupNumber"])
- print(f"Created subscription {product_id} ({sub['id']})")
- current_subs.append(sub)
- else:
- print(f"Found subscription {product_id} ({sub['id']})")
- sub_id = sub["id"]
- ensure_subscription_localization(client, sub_id, spec)
- ensure_availability(client, sub_id)
- ensure_price(client, sub_id, spec["displayPrice"])
- trial_days = intro_days_from_storekit(spec.get("introductoryOffer"))
- if trial_days:
- ensure_free_trial(client, sub_id, trial_days)
- print(f" Free trial: {trial_days} days")
- state = sub.get("attributes", {}).get("state")
- refreshed = client.get_json(f"/subscriptions/{sub_id}")
- state = refreshed["data"]["attributes"].get("state", state)
- manifest["products"].append(
- {
- "productId": product_id,
- "subscriptionId": sub_id,
- "referenceName": spec["referenceName"],
- "displayPrice": spec["displayPrice"],
- "period": spec["recurringSubscriptionPeriod"],
- "state": state,
- }
- )
- print(f" State: {state}")
- MANIFEST_PATH.write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8")
- print(f"Wrote manifest: {MANIFEST_PATH}")
- print("Done. Review subscriptions in App Store Connect and submit for review when ready.")
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|