From 961755c08777400d8a341638b433c0935a2386c3 Mon Sep 17 00:00:00 2001
From: kuankuan2007 <2163826131@qq.com>
Date: Sun, 1 Dec 2024 22:43:51 +0800
Subject: [PATCH] v0.2.0
---
.gitignore | 1 +
lib/getPlayInfo/__init__.py | 4 +
lib/getPlayInfo/api.py | 49 +++++++
lib/getPlayInfo/page.py | 69 ++++++++++
lib/util.py | 130 ++++++++++++++++++
main.py | 262 +++++++++++++-----------------------
requirements.txt | 1 -
7 files changed, 350 insertions(+), 166 deletions(-)
create mode 100644 lib/getPlayInfo/__init__.py
create mode 100644 lib/getPlayInfo/api.py
create mode 100644 lib/getPlayInfo/page.py
create mode 100644 lib/util.py
diff --git a/.gitignore b/.gitignore
index 1e893aa..b0cbac7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,3 +2,4 @@ venv/
.venv/
build/
dist/
+__pycache__
diff --git a/lib/getPlayInfo/__init__.py b/lib/getPlayInfo/__init__.py
new file mode 100644
index 0000000..44040a3
--- /dev/null
+++ b/lib/getPlayInfo/__init__.py
@@ -0,0 +1,4 @@
+from . import api
+from . import page
+
+__all__ = ["api", "page"]
\ No newline at end of file
diff --git a/lib/getPlayInfo/api.py b/lib/getPlayInfo/api.py
new file mode 100644
index 0000000..fa714cb
--- /dev/null
+++ b/lib/getPlayInfo/api.py
@@ -0,0 +1,49 @@
+from typing import *
+import requests
+import re
+import lib.util as util
+
+
+def getPlayInfo(video: dict, cookie: str):
+ return requests.get(
+ "https://api.bilibili.com/x/player/wbi/playurl",
+ params={
+ "avid": video["aid"],
+ "bvid": video["bvid"],
+ "cid": video["cid"],
+ "fnval": "4048",
+ },
+ headers=util.getHeader(cookie),
+ ).json()["data"]
+
+
+def get(video: str, cookie: str):
+ logger = util.getLogger("AnalyzingBvOrAvApi")
+ try:
+ info = {
+ "aid": (re.findall(r"av([1-9][0-9]*)", video, re.I) or [None])[0],
+ "bvid": (re.findall(r"BV[0-9a-zA-Z]{10}", video, re.I) or [None])[0],
+ }
+ if not info["aid"] and not info["bvid"]:
+ logger.warning("No av or bv found")
+ return []
+ pagelist = requests.get(
+ "https://api.bilibili.com/x/player/pagelist",
+ params=info,
+ headers=util.getHeader(cookie),
+ ).json()["data"]
+ assert pagelist and type(pagelist) == list, "Can't get page list"
+ logger.info(f"Got {len(pagelist)} pages")
+ return [
+ {
+ "title": i["part"],
+ "playinfo": util.toCallback(
+ getPlayInfo, {**info, "cid": i["cid"]}, cookie
+ ),
+ }
+ for i in pagelist
+ ]
+ except Exception as e:
+ logger.warning(f"Can't get page list with error {util.errorLogInfo(e)}")
+ return []
+
diff --git a/lib/getPlayInfo/page.py b/lib/getPlayInfo/page.py
new file mode 100644
index 0000000..5ec47b6
--- /dev/null
+++ b/lib/getPlayInfo/page.py
@@ -0,0 +1,69 @@
+from typing import *
+import requests
+import re
+import json
+import lib.util as util
+
+
+def get(video: str, cookie: str):
+ logger = util.getLogger("AnalyzingPage")
+ url = util.getPageUrl(video)
+ headers = util.getHeader(cookie, url)
+ try:
+ logger.info(f"request page url: {url}, headers: {headers}")
+ response = requests.get(url, headers=headers, timeout=60)
+ logger.info(f"response status code: {response.status_code}")
+ assert response.status_code // 100 == 2
+ html = response.text
+ except Exception as e:
+ logger.warning(f"Can't get page response with error {util.errorLogInfo(e)}")
+ util.messagebox.showerror(
+ "错误",
+ f"请求错误,无法获取页面信息\n{util.errorLogInfo(e)}",
+ )
+ return
+ try:
+ logger.info("start parse page")
+ flag = False
+ with open("a.html", "w", encoding="utf-8") as f:
+ f.write(html)
+ for i, maper in [
+ (
+ re.compile(r"window.__playinfo__=(.*?)", re.S),
+ lambda x: x["data"],
+ ),
+ (
+ re.compile(
+ r'',
+ re.S,
+ ),
+ lambda x: [
+ i["state"]["data"]["result"]["video_info"]
+ for i in x["props"]["pageProps"]["dehydratedState"]["queries"]
+ if util.optionalChain(i, "state", "data", "result", "video_info")
+ ][0],
+ ),
+ ]:
+ try:
+ palyInfo = maper(json.loads(re.findall(i, html)[0]))
+ except Exception:
+ continue
+ else:
+ flag = True
+ break
+
+ if not flag:
+ logger.warning("Can't find playinfo in page")
+ raise Exception("Can't find playinfo in page")
+ return [
+ {
+ "title": util.optionalChain(
+ re.findall(r"
(.*)", html), 0, default="Unknown"
+ ),
+ "playinfo": lambda: palyInfo,
+ }
+ ]
+ except Exception as e:
+ logger.warning(f"Can't parse page with error {util.errorLogInfo(e)}, return")
+ util.messagebox.showerror("错误", "解析错误,无法获取视频信息")
+ return
diff --git a/lib/util.py b/lib/util.py
new file mode 100644
index 0000000..e2e6e1a
--- /dev/null
+++ b/lib/util.py
@@ -0,0 +1,130 @@
+from typing import *
+import tkinter.messagebox as messagebox
+import logging
+import re
+import pathlib
+import sys
+import os
+import tempfile
+import time
+
+
+def toCallback(func: Callable, *args, **kwargs) -> Callable:
+ """
+ Converts a function to a callback format.
+
+ Args:
+ func (Callable): The function to be converted.
+ *args: Positional arguments to be passed to the function.
+ **kwargs: Keyword arguments to be passed to the function.
+
+ Returns:
+ Callable: A callback function that calls the original function with the provided arguments.
+ """
+ return lambda *args_callback, **kwargs_callback: func(
+ *args, *args_callback, **kwargs, **kwargs_callback
+ )
+
+
+def getLogger(name: str):
+ return logging.getLogger(name)
+
+
+def optionalChain(d: dict | list, *keys: Any, default: Any = None) -> Any:
+ """
+ Chains together a series of keys in a dictionary to retrieve a value.
+
+ Args:
+ d (dict): The dictionary to search.
+ *keys (str): The keys to chain together.
+ default (Any, optional): The default value to return if the key chain is invalid. Defaults to None.
+
+ Returns:
+ Any: The value associated with the key chain, or the default value if the key chain is invalid.
+ """
+ for key in keys:
+ if key in d:
+ d = d[key]
+ else:
+ return default
+ return d
+
+
+def getHeader(cookie: str | None = None, referer: str | None = None) -> dict:
+ return {
+ "Referer": referer,
+ "Cookie": cookie,
+ "Accept": "*/*" "Accept-language:zh-CN,zh;q=0.9,en;q=0.8",
+ "sec-ch-ua": '"Microsoft Edge";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
+ "sec-ch-ua-mobile": "?0",
+ "sec-ch-ua-platform": '"Windows"',
+ "sec-fetch-dest": "empty",
+ "sec-fetch-mode": "cors",
+ "sec-fetch-site": "same-site",
+ "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0",
+ }
+
+def getPageUrl(vid: str):
+ if re.match(
+ r"^(((ht|f)tps?):\/\/)?([^!@#$%^&*?.\s-]([^!@#$%^&*?.\s]{0,63}[^!@#$%^&*?.\s])?\.)+[a-z]{2,6}\/?",
+ vid,
+ ):
+ return vid
+ elif re.match(
+ re.compile("av[1-9][0-9]*", re.I),
+ vid,
+ ) or re.match("(?:B|b)(?:v|V)[0-9a-zA-Z]{10}", vid):
+ return f"https://www.bilibili.com/video/{vid}?spm_id_from=player_end_recommend_autoplay"
+ elif re.match(
+ re.compile("(?:ep|ss)[1-9][0-9]*", re.I),
+ vid,
+ ):
+ return f"https://www.bilibili.com/bangumi/play/{vid}"
+ return None
+
+if getattr(sys, "frozen", None):
+ dataBasePath = pathlib.Path(sys._MEIPASS)
+else:
+ dataBasePath = pathlib.Path(os.getcwd()).joinpath("data")
+
+
+def dataPath(filename: str | pathlib.Path):
+ if isinstance(filename, str):
+ filename = pathlib.Path(filename)
+ return dataBasePath.joinpath(filename)
+
+def errorLogInfo(e:BaseException):
+ return f"{e.__class__.__name__}:{str(e)}"
+
+
+tempRoot = pathlib.Path(tempfile.gettempdir()).joinpath(
+ f"k-bilibili-download-{time.time()}"
+)
+if not tempRoot.exists():
+ os.mkdir(tempRoot)
+
+fileLogHandler = logging.FileHandler(
+ filename=str(tempRoot.joinpath("log.txt")),
+ mode="w",
+ encoding="utf-8",
+)
+fmter = logging.Formatter(
+ fmt="[%(asctime)s] [%(name)s] [t-%(thread)d] [%(levelname)s]: %(message)s",
+ datefmt="%Y-%m-%d %H:%M:%S",
+)
+
+consoleLogHandler = logging.StreamHandler()
+
+rootLogger = logging.getLogger()
+
+rootLogger.addHandler(fileLogHandler)
+rootLogger.addHandler(consoleLogHandler)
+rootLogger.setLevel(logging.DEBUG)
+
+fileLogHandler.setLevel(logging.INFO)
+fileLogHandler.setFormatter(fmter)
+consoleLogHandler.setLevel(logging.INFO)
+consoleLogHandler.setFormatter(fmter)
+
+
+__all__ = ["toCallback", "messagebox", "getLogger", "optionalChain", "getHeader"]
diff --git a/main.py b/main.py
index c58a47b..9ccf85e 100644
--- a/main.py
+++ b/main.py
@@ -1,62 +1,24 @@
import tkinter as tk
from tkinter import messagebox, filedialog, ttk
-import os
import requests
-import json
+import lib.util as util
import threading
from typing import *
-import re
import io
import pyperclip
-import tempfile
import pathlib
import time
import subprocess
-import logging
-import sys
from PIL import Image, ImageTk
+import lib.getPlayInfo as getPlayInfo
-if getattr(sys, "frozen", None):
- dataBasePath = pathlib.Path(sys._MEIPASS)
-else:
- dataBasePath = pathlib.Path(os.getcwd()).joinpath("data")
-
-
-def dataPath(filename: str | pathlib.Path):
- if isinstance(filename, str):
- filename = pathlib.Path(filename)
- return dataBasePath.joinpath(filename)
-
-
-tempRoot = pathlib.Path(tempfile.gettempdir()).joinpath(
- f"k-bilibili-download-{time.time()}"
-)
-if not tempRoot.exists():
- os.mkdir(tempRoot)
-fileLogHandler = logging.FileHandler(
- filename=str(tempRoot.joinpath("log.txt")),
- mode="w",
- encoding="utf-8",
-)
-fmter = logging.Formatter(
- fmt="[%(asctime)s] [%(name)s] [t-%(thread)d] [%(levelname)s]: %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S",
-)
-consoleLogHandler = logging.StreamHandler()
-rootLogger = logging.getLogger()
-rootLogger.addHandler(fileLogHandler)
-rootLogger.addHandler(consoleLogHandler)
-rootLogger.setLevel(logging.DEBUG)
-fileLogHandler.setLevel(logging.INFO)
-fileLogHandler.setFormatter(fmter)
-consoleLogHandler.setLevel(logging.INFO)
-consoleLogHandler.setFormatter(fmter)
+rootLogger = util.getLogger("root")
-rootLogger.info(f"base dir: {dataBasePath}")
+rootLogger.info(f"base dir: {util.dataBasePath}")
root = tk.Tk()
@@ -77,7 +39,7 @@ def _download(
fail: Callable[[], None],
progress: ttk.Progressbar,
):
- logger = rootLogger.getChild("_download")
+ logger = util.getLogger("_download")
logger.info(f"start downloading {url}")
@@ -95,7 +57,7 @@ def _download(
progress.update()
except Exception as e:
- logger.warning(f"download failed: {e.__class__.__name__}: {e}")
+ logger.warning(f"download failed: {util.errorLogInfo(e)}: {e}")
fail()
else:
@@ -105,12 +67,16 @@ def _download(
def startDownload(
- videoInfo: dict, audioInfo: dict, header: dict, savePath: pathlib.Path
+ videoInfo: dict, audioInfo: dict, cookie: str, savePath: pathlib.Path, video: str
):
- logger = rootLogger.getChild("startDownload")
+ logger = util.getLogger("startDownload")
- videoPath = tempRoot.joinpath(f"video-{time.time()}.tmp")
- audioPath = tempRoot.joinpath(f"audio-{time.time()}.tmp")
+ refererUrl = util.getPageUrl(video)
+
+ header = util.getHeader(cookie, refererUrl)
+
+ videoPath = util.tempRoot.joinpath(f"video-{time.time()}.tmp")
+ audioPath = util.tempRoot.joinpath(f"audio-{time.time()}.tmp")
logger.info(f"videoPath: {videoPath}, audioPath: {audioPath}")
@@ -244,9 +210,52 @@ def _start(t=Literal["video", "audio", "merge"]):
_start("audio")
+def askDownloadPart(
+ videoList: List[dict],
+ callback: Callable[[dict], None],
+):
+ logger = util.getLogger("askDownloadPart")
+
+ selectWindow = showModal(root)
+ selectWindow.title("选择要下载的部分")
+
+ _main = tk.Frame(selectWindow)
+ _main.grid(row=0, column=0, padx=10, pady=10)
+
+ tk.Label(_main, text="视频:").grid(row=0, column=0)
+
+ combobox = ttk.Combobox(_main, width=40)
+ combobox["values"] = [
+ f"{index+1}. {value['title']}" for index, value in enumerate(videoList)
+ ]
+ combobox.current(0)
+ combobox.config(state="readonly")
+
+ combobox.grid(row=0, column=1)
+
+ buttonBox = tk.Frame(_main)
+ buttonBox.grid(row=2, column=0, columnspan=2, pady=10, sticky="e")
+
+ def confirmed():
+ logger.info(f"download part confirmed by user, v:{combobox.current()}")
+ video = videoList[combobox.current()]
+ close()
+ logger.info("End this life cycle")
+ callback(video)
+
+ def close():
+ logger.info("window closed")
+ selectWindow.destroy()
+
+ ttk.Button(buttonBox, text="取消", command=close).grid(row=0, column=0)
+ ttk.Button(buttonBox, text="确认", command=confirmed).grid(row=0, column=1, padx=2)
+
+ logger.info("window initialized")
+
+
def requestDownload():
- logger = rootLogger.getChild("requestDownload")
+ logger = util.getLogger("requestDownload")
video = videoVar.get()
cookie = cookieVar.get().replace("\r", "").replace("\n", "")
@@ -258,113 +267,6 @@ def requestDownload():
logger.info("Incomplete information, return")
messagebox.showerror("错误", "请填写完整信息")
return
- if re.match(
- r"^(((ht|f)tps?):\/\/)?([^!@#$%^&*?.\s-]([^!@#$%^&*?.\s]{0,63}[^!@#$%^&*?.\s])?\.)+[a-z]{2,6}\/?",
- video,
- ):
- url = video
- logger.info("input type is url")
- elif re.match(
- re.compile("av[1-9][0-9]*", re.I),
- video,
- ) or re.match("(?:B|b)(?:v|V)[0-9a-zA-Z]{10}", video):
- url = f"https://www.bilibili.com/video/{video}?spm_id_from=player_end_recommend_autoplay"
- logger.info(f"input type is av/bv, transform to url: {url}")
- elif re.match(
- re.compile("(?:ep|ss)[1-9][0-9]*", re.I),
- video,
- ):
- url = f"https://www.bilibili.com/bangumi/play/{video}"
- logger.info(f"input type is ep, transform to url: {url}")
- else:
- logger.info("input type is invalid, return")
- messagebox.showerror("错误", "请输入正确的视频地址或av号/BV号")
- return
- headers = {
- "Referer": url,
- "Cookie": cookie,
- "Accept": "*/*" "Accept-language:zh-CN,zh;q=0.9,en;q=0.8",
- "sec-ch-ua": '"Microsoft Edge";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": '"Windows"',
- "sec-fetch-dest": "empty",
- "sec-fetch-mode": "cors",
- "sec-fetch-site": "same-site",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0",
- }
-
- try:
- logger.info(f"request page url: {url}, headers: {headers}")
- response = requests.get(url, headers=headers, timeout=60)
- logger.info(f"response status code: {response.status_code}")
- assert response.status_code // 100 == 2
- html = response.text
- except Exception as e:
- logger.warning(
- f"Can't get page response with error {e.__class__.__name__}:{str(e)}"
- )
- messagebox.showerror(
- "错误",
- f"请求错误,无法获取页面信息\n{e.__class__.__name__}:{str(e)}",
- )
- return
-
- try:
- logger.info("start parse page")
- flag = False
- f = open("a.html", "w", encoding="utf-8")
- f.write(response.text)
- f.close()
- for i, maper in [
- (
- re.compile(r"window.__playinfo__=(.*?)", re.S),
- lambda x: x["data"],
- ),
- (
- re.compile(
- r'',
- re.S,
- ),
- lambda x: [
- i["state"]["data"]["result"]["video_info"]
- for i in x["props"]["pageProps"]["dehydratedState"]["queries"]
- if i.get("state", {})
- .get("data", {})
- .get("result", {})
- .get("video_info")
- ][0],
- ),
- ]:
- try:
- palyInfo = maper(json.loads(re.findall(i, html)[0]))
- except Exception:
- continue
- else:
- flag = True
- break
- if not flag:
- logger.warning("Can't find playinfo in page")
- raise Exception("Can't find playinfo in page")
-
- acceptQuality: Dict[int, str] = dict(
- zip(
- palyInfo["accept_quality"],
- palyInfo["accept_description"],
- )
- )
-
- videoList: List[dict] = palyInfo["dash"]["video"]
- audioList: List[dict] = palyInfo["dash"]["audio"]
-
- assert len(audioList)
- assert len(videoList)
-
- except Exception as e:
- logger.warning(
- f"Can't parse page with error {e.__class__.__name__}:{str(e)}, return"
- )
- messagebox.showerror("错误", "解析错误,无法获取视频信息")
- return
try:
logger.info("Validating the save path")
open(savePath, "wb").close()
@@ -372,13 +274,43 @@ def requestDownload():
logger.warning("Invalid save path, return")
messagebox.showerror("错误", "无法打开保存路径")
return
-
- def askDownloadTypeCallback(videoInfo: dict, audioInfo: dict):
- logger.info("download information confirmed")
- startDownload(videoInfo, audioInfo, headers, savePath)
+ getPlayList(video, cookie, savePath)
+
+
+def getPlayList(video: str, cookie: str, savePath: str):
+ logger = util.getLogger("getPlayList")
+ for i in (getPlayInfo.api, getPlayInfo.page):
+ res: List[dict] | None = i.get(video, cookie)
+ if res is None:
+ logger.warning("Play list not found")
+ return
+ elif res:
+ if len(res) > 1:
+ logger.info("Multiple play lists found, ask user to select")
+ askDownloadPart(
+ res,
+ util.toCallback(
+ getPlayUrl, cookie=cookie, savePath=savePath, video=video
+ ),
+ )
+ elif len(res) == 1:
+ logger.info("Single play list found, start get play url")
+ getPlayUrl(res[0], cookie, savePath, video)
+ return
+
+
+def getPlayUrl(videoInfo: dict, cookie: str, savePath: str, video: str):
+ logger = util.getLogger("getPlayUrl")
+
+ playinfo: Dict = videoInfo["playinfo"]()
logger.info("start ask download type")
- askDownloadType(videoList, audioList, acceptQuality, askDownloadTypeCallback)
+ askDownloadType(
+ playinfo["dash"]["video"],
+ playinfo["dash"]["audio"],
+ dict(zip(playinfo["accept_quality"], playinfo["accept_description"])),
+ util.toCallback(startDownload, cookie=cookie, savePath=savePath, video=video),
+ )
def askDownloadType(
@@ -387,7 +319,7 @@ def askDownloadType(
acceptQuality: Dict[int, str],
callback: Callable[[Dict, Dict], None],
):
- logger = rootLogger.getChild("askDownloadType")
+ logger = util.getLogger("askDownloadType")
selectWindow = showModal(root)
selectWindow.title("选择音视频通道")
@@ -448,7 +380,7 @@ def mergeVideo(
mergeSuccess: Callable[[], None],
mergeFail: Callable[[], None],
):
- logger = rootLogger.getChild("mergeVideo")
+ logger = util.getLogger("mergeVideo")
logger.info(f"merge video: {videoPath}, audio: {audioPath}, save: {savePath}")
@@ -485,7 +417,7 @@ def mergeVideo(
class HelpButton(tk.Label):
img = ImageTk.PhotoImage(
- Image.open(dataPath("help.png")).resize((18, 18), Image.LANCZOS)
+ Image.open(util.dataPath("help.png")).resize((18, 18), Image.LANCZOS)
)
helpTitle: str
helpText: str
@@ -514,7 +446,7 @@ def _showHelp(self, *_args, **_kw):
root.title("视频下载器")
root.resizable(0, 0)
try:
- root.iconphoto(True, tk.PhotoImage(file=str(dataPath("icon.png"))))
+ root.iconphoto(True, tk.PhotoImage(file=str(util.dataPath("icon.png"))))
except Exception as e:
rootLogger.warning("failed to load icon")
else:
diff --git a/requirements.txt b/requirements.txt
index 5ad8e80..a6e9afb 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,3 @@
-Cython==3.0.11
numpy==2.1.3
pillow==11.0.0
pyinstaller==6.11.1