forked from sibalzer/primelooter
-
Notifications
You must be signed in to change notification settings - Fork 3
/
experiment.py
71 lines (60 loc) · 4.28 KB
/
experiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import httpx
import json
import asyncio
import re
import logging
import http.cookiejar as cookiejar
gql_url = "https://gaming.amazon.com/graphql"
logging.getLogger("httpx").setLevel(logging.WARNING)
log = logging.getLogger()
offers_payload = {
"operationName": "OffersContext_Offers_And_Items",
"variables": {"pageSize": 999},
"extensions": {},
"query": "query OffersContext_Offers_And_Items($dateOverride: Time, $pageSize: Int) {\n inGameLoot: items(\n collectionType: LOOT\n dateOverride: $dateOverride\n pageSize: $pageSize\n ) {\n items {\n ...Item\n __typename\n }\n __typename\n }\n}\n\nfragment Item on Item {\n id\n isDirectEntitlement\n requiresLinkBeforeClaim\n grantsCode\n isDeepLink\n isFGWP\n offers {\n ...Item_Offer\n __typename\n }\n game {\n ...Game\n __typename\n }\n __typename\n}\n\n\nfragment Item_Offer on Offer {\n id\n offerSelfConnection {\n eligibility {\n ...Item_Offer_Eligibility\n __typename\n }\n __typename\n }\n __typename\n}\n\nfragment Item_Offer_Eligibility on OfferEligibility {\n isClaimed\n canClaim\n missingRequiredAccountLink\n}\n\nfragment Game on GameV2 {\n id\n assets {\n title\n publisher\n }\n}\n", # noqa: E501 TODO: This needs to become a non \n formatted string somewhere, just not doing it now
}
async def claim_offer(offer_id: str, item: dict, client: httpx.AsyncClient, headers: dict) -> True:
if not item["offers"][0]["offerSelfConnection"]["eligibility"]["isClaimed"]:
if (
item["offers"][0]["offerSelfConnection"]["eligibility"]["canClaim"] is False
and item["offers"][0]["offerSelfConnection"]["eligibility"]["missingRequiredAccountLink"] is True
):
log.error(f"Cannot collect game `{item['game']['assets']['title']}`, account link required.")
return
log.info(f"Collecting offer for {item['game']['assets']['title']}")
claim_payload = {
"operationName": "placeOrdersDetailPage",
"variables": {
"input": {
"offerIds": [offer_id],
"attributionChannel": '{"eventId":"ItemDetailRootPage:' + offer_id + '","page":"ItemDetailPage"}',
}
},
"extensions": {},
"query": "fragment Place_Orders_Payload_Order_Information on OfferOrderInformation {\n catalogOfferId\n claimCode\n entitledAccountId\n entitledAccountName\n id\n orderDate\n orderState\n __typename\n}\n\nmutation placeOrdersDetailPage($input: PlaceOrdersInput!) {\n placeOrders(input: $input) {\n error {\n code\n __typename\n }\n orderInformation {\n ...Place_Orders_Payload_Order_Information\n __typename\n }\n __typename\n }\n}\n", # noqa: E501 TODO: This needs to become a non \n formatted string somewhere, just not doing it now
}
response = await client.post(gql_url, headers=headers, data=json.dumps(claim_payload))
if response.json()["data"]["placeOrders"]["error"] is not None:
log.error(f"Error: {response.json()['data']['placeOrders']['error']}")
async def primelooter(cookie_file):
jar = cookiejar.MozillaCookieJar(cookie_file)
jar.load()
async with httpx.AsyncClient() as client:
base_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/117.0",
}
json_headers = base_headers | {
"Content-Type": "application/json",
}
for _c in jar:
client.cookies.jar.set_cookie(_c)
html_body = (await client.get("https://gaming.amazon.com/home", headers=base_headers)).text
matches = re.findall(r"name='csrf-key' value='(.*)'", html_body)
json_headers["csrf-token"] = matches[0]
response = await client.post(gql_url, headers=json_headers, data=json.dumps(offers_payload))
data = response.json()["data"]["inGameLoot"]["items"]
# although insanely low, python WILL garbage collect running coroutines if their references
# aren't stored somewhere, therefore we noqa the Flake8 issue yelling at us about it.
coros = await asyncio.gather( # noqa: F841
*[claim_offer(item["offers"][0]["id"], item, client, json_headers) for item in data]
)