Skip to content

Commit

Permalink
v0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
kuankuan2007 committed Dec 1, 2024
1 parent bf7071f commit 961755c
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 166 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ venv/
.venv/
build/
dist/
__pycache__
4 changes: 4 additions & 0 deletions lib/getPlayInfo/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from . import api
from . import page

__all__ = ["api", "page"]
49 changes: 49 additions & 0 deletions lib/getPlayInfo/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import *
import requests
import re
import lib.util as util


def getPlayInfo(video: dict, cookie: str):
return requests.get(
"https://api.bilibili.com/x/player/wbi/playurl",
params={
"avid": video["aid"],
"bvid": video["bvid"],
"cid": video["cid"],
"fnval": "4048",
},
headers=util.getHeader(cookie),
).json()["data"]


def get(video: str, cookie: str):
logger = util.getLogger("AnalyzingBvOrAvApi")
try:
info = {
"aid": (re.findall(r"av([1-9][0-9]*)", video, re.I) or [None])[0],
"bvid": (re.findall(r"BV[0-9a-zA-Z]{10}", video, re.I) or [None])[0],
}
if not info["aid"] and not info["bvid"]:
logger.warning("No av or bv found")
return []
pagelist = requests.get(
"https://api.bilibili.com/x/player/pagelist",
params=info,
headers=util.getHeader(cookie),
).json()["data"]
assert pagelist and type(pagelist) == list, "Can't get page list"
logger.info(f"Got {len(pagelist)} pages")
return [
{
"title": i["part"],
"playinfo": util.toCallback(
getPlayInfo, {**info, "cid": i["cid"]}, cookie
),
}
for i in pagelist
]
except Exception as e:
logger.warning(f"Can't get page list with error {util.errorLogInfo(e)}")
return []

69 changes: 69 additions & 0 deletions lib/getPlayInfo/page.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import *
import requests
import re
import json
import lib.util as util


def get(video: str, cookie: str):
logger = util.getLogger("AnalyzingPage")
url = util.getPageUrl(video)
headers = util.getHeader(cookie, url)
try:
logger.info(f"request page url: {url}, headers: {headers}")
response = requests.get(url, headers=headers, timeout=60)
logger.info(f"response status code: {response.status_code}")
assert response.status_code // 100 == 2
html = response.text
except Exception as e:
logger.warning(f"Can't get page response with error {util.errorLogInfo(e)}")
util.messagebox.showerror(
"错误",
f"请求错误,无法获取页面信息\n{util.errorLogInfo(e)}",
)
return
try:
logger.info("start parse page")
flag = False
with open("a.html", "w", encoding="utf-8") as f:
f.write(html)
for i, maper in [
(
re.compile(r"window.__playinfo__=(.*?)</script>", re.S),
lambda x: x["data"],
),
(
re.compile(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
re.S,
),
lambda x: [
i["state"]["data"]["result"]["video_info"]
for i in x["props"]["pageProps"]["dehydratedState"]["queries"]
if util.optionalChain(i, "state", "data", "result", "video_info")
][0],
),
]:
try:
palyInfo = maper(json.loads(re.findall(i, html)[0]))
except Exception:
continue
else:
flag = True
break

if not flag:
logger.warning("Can't find playinfo in page")
raise Exception("Can't find playinfo in page")
return [
{
"title": util.optionalChain(
re.findall(r"<title.*>(.*)</title>", html), 0, default="Unknown"
),
"playinfo": lambda: palyInfo,
}
]
except Exception as e:
logger.warning(f"Can't parse page with error {util.errorLogInfo(e)}, return")
util.messagebox.showerror("错误", "解析错误,无法获取视频信息")
return
130 changes: 130 additions & 0 deletions lib/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from typing import *
import tkinter.messagebox as messagebox
import logging
import re
import pathlib
import sys
import os
import tempfile
import time


def toCallback(func: Callable, *args, **kwargs) -> Callable:
"""
Converts a function to a callback format.
Args:
func (Callable): The function to be converted.
*args: Positional arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Callable: A callback function that calls the original function with the provided arguments.
"""
return lambda *args_callback, **kwargs_callback: func(
*args, *args_callback, **kwargs, **kwargs_callback
)


def getLogger(name: str):
return logging.getLogger(name)


def optionalChain(d: dict | list, *keys: Any, default: Any = None) -> Any:
"""
Chains together a series of keys in a dictionary to retrieve a value.
Args:
d (dict): The dictionary to search.
*keys (str): The keys to chain together.
default (Any, optional): The default value to return if the key chain is invalid. Defaults to None.
Returns:
Any: The value associated with the key chain, or the default value if the key chain is invalid.
"""
for key in keys:
if key in d:
d = d[key]
else:
return default
return d


def getHeader(cookie: str | None = None, referer: str | None = None) -> dict:
return {
"Referer": referer,
"Cookie": cookie,
"Accept": "*/*" "Accept-language:zh-CN,zh;q=0.9,en;q=0.8",
"sec-ch-ua": '"Microsoft Edge";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0",
}

def getPageUrl(vid: str):
if re.match(
r"^(((ht|f)tps?):\/\/)?([^!@#$%^&*?.\s-]([^!@#$%^&*?.\s]{0,63}[^!@#$%^&*?.\s])?\.)+[a-z]{2,6}\/?",
vid,
):
return vid
elif re.match(
re.compile("av[1-9][0-9]*", re.I),
vid,
) or re.match("(?:B|b)(?:v|V)[0-9a-zA-Z]{10}", vid):
return f"https://www.bilibili.com/video/{vid}?spm_id_from=player_end_recommend_autoplay"
elif re.match(
re.compile("(?:ep|ss)[1-9][0-9]*", re.I),
vid,
):
return f"https://www.bilibili.com/bangumi/play/{vid}"
return None

if getattr(sys, "frozen", None):
dataBasePath = pathlib.Path(sys._MEIPASS)
else:
dataBasePath = pathlib.Path(os.getcwd()).joinpath("data")


def dataPath(filename: str | pathlib.Path):
if isinstance(filename, str):
filename = pathlib.Path(filename)
return dataBasePath.joinpath(filename)

def errorLogInfo(e:BaseException):
return f"{e.__class__.__name__}:{str(e)}"


tempRoot = pathlib.Path(tempfile.gettempdir()).joinpath(
f"k-bilibili-download-{time.time()}"
)
if not tempRoot.exists():
os.mkdir(tempRoot)

fileLogHandler = logging.FileHandler(
filename=str(tempRoot.joinpath("log.txt")),
mode="w",
encoding="utf-8",
)
fmter = logging.Formatter(
fmt="[%(asctime)s] [%(name)s] [t-%(thread)d] [%(levelname)s]: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)

consoleLogHandler = logging.StreamHandler()

rootLogger = logging.getLogger()

rootLogger.addHandler(fileLogHandler)
rootLogger.addHandler(consoleLogHandler)
rootLogger.setLevel(logging.DEBUG)

fileLogHandler.setLevel(logging.INFO)
fileLogHandler.setFormatter(fmter)
consoleLogHandler.setLevel(logging.INFO)
consoleLogHandler.setFormatter(fmter)


__all__ = ["toCallback", "messagebox", "getLogger", "optionalChain", "getHeader"]
Loading

0 comments on commit 961755c

Please sign in to comment.