-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapi.py
169 lines (142 loc) · 5.55 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import asyncio
from typing import Optional, Any
from megadebrid.libs.base import MegaDebrid
from megadebrid.utils.decorators import renew_obsolete_token
class MegaDebridApi(MegaDebrid):
"""
Mega-Debrid API: provide the methods to perform the authorized actions through API backend.
Source: https://www.mega-debrid.eu/index.php?page=api
WARNING : If you send more than 50 requests per seconds, your IP address will be banned for 1 day.
"""
def __init__(self) -> None:
super().__init__()
self.api_token = self.config.get_api_token()
@property
def api_url(self) -> str:
return f"{ self.base_url }/api.php"
async def __aenter__(self):
await super().__aenter__()
await self.get_token()
return self
async def get_token(self, is_renew: bool = False) -> None:
"""Verify if token is present config, else authenticate user with his credentials"""
if not self.api_token:
response = await self.connect_user()
self.api_token = response.get("token")
if not self.api_token:
# and not is_renew:
raise Exception(
"Could not authenticate on Mega-Debrid API: require to have token."
)
# Save token
if False:
self.config.save_api_token(self.api_token)
async def connect_user(self) -> dict[str, str]:
"""
Connect user:
URL: https://www.mega-debrid.eu/api.php?action=connectUser&login=[user_login]&password=[user_password]
WARNING : After 4 login failed (bad login/password), your IP address will be banned for some minutes.
"""
credentials = self.config.get_credentials()
params = {
"action": "connectUser",
"login": credentials["Username"],
"password": credentials["Password"],
}
async with self.session.get(self.api_url, params=params) as response:
return await response.json(content_type="text/html")
@renew_obsolete_token
async def get_user_history(self) -> dict[str, Any]:
"""
Get user history:
URL: https://www.mega-debrid.eu/api.php?action=getUserHistory&token=[token]
"""
params = {
"action": "getUserHistory",
"token": self.api_token,
}
async with self.session.get(self.api_url, params=params) as response:
return await response.json(content_type="text/html")
@renew_obsolete_token
async def get_hosters_list(self) -> dict[str, Any]:
"""
Get hosters list:
URL: https://www.mega-debrid.eu/api.php?action=getHostersList
"""
params = {"action": "getHostersList"}
async with self.session.get(self.api_url, params=params) as response:
return await response.json(content_type="text/html")
@renew_obsolete_token
async def upload_magnet(self, magnet: str) -> dict[str, Any]:
"""
Upload torrent (magnet URL of the torrent):
URL: https://www.mega-debrid.eu/api.php?action=uploadTorrent&token=[token]
"""
params = {
"action": "uploadTorrent",
"token": self.api_token,
}
async with self.session.post(
self.api_url, params=params, data={"magnet": magnet}
) as response:
return await response.json(content_type="text/html")
@renew_obsolete_token
async def upload_torrent(self, torrent: str) -> dict[str, Any]:
"""
Upload torrent (upload file directly):
URL: https://www.mega-debrid.eu/api.php?action=uploadTorrent&token=[token]
"""
params = {
"action": "uploadTorrent",
"token": self.api_token,
}
async with self.session.post(
self.api_url, params=params, data={"file": open(torrent, "rb")}
) as response:
return await response.json(content_type="text/html")
@renew_obsolete_token
async def get_torrents_list(self) -> dict[str, Any]:
"""
Get Torrents list:
URL: https://www.mega-debrid.eu/api.php?action=getTorrents&token=[token]
"""
params = {
"action": "getTorrents",
"token": self.api_token,
}
async with self.session.get(self.api_url, params=params) as response:
return await response.json(content_type="text/html")
@renew_obsolete_token
async def get_torrent_status(self, torrent_hash: str) -> dict[str, Any]:
"""
Get torrent information:
URL: https://www.mega-debrid.eu/api.php?action=getTorrent&token=[token]
"""
params = {
"action": "getTorrent",
"token": self.api_token,
}
async with self.session.post(
self.api_url, params=params, data={"hash": torrent_hash}
) as response:
return await response.json(content_type="text/html")
@renew_obsolete_token
async def debrid_link(
self, link: str, password: Optional[str] = None
) -> dict[str, str]:
"""
Get debrid link:
URL: https://www.mega-debrid.eu/api.php?action=getLink&token=[token]
"""
params = {
"action": "getLink",
"token": self.api_token,
}
data = {
"link": link,
"password": self.hash_passwd(password),
}
async with self.session.post(
self.api_url, params=params, data=data
) as response:
return await response.json(content_type="text/html")