Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Using script data #13

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,470 changes: 1,470 additions & 0 deletions data_discovery.ipynb

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions fast_flights/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .core import get_flights
from .filter import create_filter
from .flights_impl import Airport, FlightData, Passengers, TFSData
from .schema import Flight, Result
from .schema import HTMLParsedFlight, HTMLParsedResult, FlightsAPIResult
from .search import search_airport

__all__ = [
Expand All @@ -12,8 +12,9 @@
"FlightData",
"Passengers",
"get_flights",
"Result",
"Flight",
"HTMLParsedResult",
"HTMLParsedFlight",
"FlightsAPIResult",
"search_airport",
"Cookies",
]
199 changes: 114 additions & 85 deletions fast_flights/core.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from typing import Any, Optional
import re
import json
from pathlib import Path
from typing import Any, Optional, Literal, Union, Dict, overload

import requests
from selectolax.lexbor import LexborHTMLParser, LexborNode

from .flights_impl import TFSData
from .schema import Flight, Result
from .schema import FlightsAPIResult, HTMLParsedFlight, HTMLParsedResult

ParseMethod = Union[Literal['js_data'], Literal['html']]

ua = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
Expand All @@ -18,7 +23,7 @@ def request_flights(
*,
currency: Optional[str] = None,
language: Optional[str],
**kwargs: Any,
**kwargs: Dict[str, Any],
) -> requests.Response:
r = requests.get(
"https://www.google.com/travel/flights",
Expand All @@ -34,86 +39,102 @@ def request_flights(
r.raise_for_status()
return r

@overload
def parse_response(r: requests.Response, *, parse_method: Literal['js_data'], **kwargs) -> FlightsAPIResult: ...

def parse_response(
r: requests.Response, *, dangerously_allow_looping_last_item: bool = False
) -> Result:
class _blank:
def text(self, *_, **__):
return ""

def iter(self):
return []

blank = _blank()

def safe(n: Optional[LexborNode]):
return n or blank

parser = LexborHTMLParser(r.text)
flights = []

for i, fl in enumerate(parser.css('div[jsname="IWWDBc"], div[jsname="YdtKid"]')):
is_best_flight = i == 0

for item in fl.css("ul.Rk10dc li")[
: (-1 if not dangerously_allow_looping_last_item else None)
]:
# Flight name
name = safe(item.css_first("div.sSHqwe.tPgKwe.ogfYpf span")).text(
strip=True
)

# Get departure & arrival time
dp_ar_node = item.css("span.mv1WYe div")
try:
departure_time = dp_ar_node[0].text(strip=True)
arrival_time = dp_ar_node[1].text(strip=True)
except IndexError:
# sometimes this is not present
departure_time = ""
arrival_time = ""

# Get arrival time ahead
time_ahead = safe(item.css_first("span.bOzv6")).text()

# Get duration
duration = safe(item.css_first("li div.Ak5kof div")).text()

# Get flight stops
stops = safe(item.css_first(".BbR8Ec .ogfYpf")).text()

# Get delay
delay = safe(item.css_first(".GsCCve")).text() or None

# Get prices
price = safe(item.css_first(".YMlIz.FpEdX")).text() or "0"

# Stops formatting
try:
stops_fmt = 0 if stops == "Nonstop" else int(stops.split(" ", 1)[0])
except ValueError:
stops_fmt = "Unknown"

flights.append(
{
"is_best": is_best_flight,
"name": name,
"departure": " ".join(departure_time.split()),
"arrival": " ".join(arrival_time.split()),
"arrival_time_ahead": time_ahead,
"duration": duration,
"stops": stops_fmt,
"delay": delay,
"price": price.replace(",", ""),
}
)

# Get current price
current_price = safe(parser.css_first("span.gOatQ")).text()

return Result(current_price=current_price, flights=[Flight(**fl) for fl in flights]) # type: ignore
@overload
def parse_response(r: requests.Response, *, parse_method: Literal['html'], **kwargs) -> HTMLParsedResult: ...

def parse_response(
r: requests.Response,
*,
dangerously_allow_looping_last_item: bool = False,
parse_method: ParseMethod = 'js_data',
) -> Union[FlightsAPIResult, HTMLParsedResult]:
if parse_method == 'js_data':
match = re.search(r'key: \'ds:1\',.*?data:(\[.*?\]), sideChannel: {', r.text)
if not match:
print(r)
print(r.request.url)
print(r.status_code)
(Path.cwd() / 'error.html').write_text(r.text)
assert match, 'Cannot find flight data in script tag'
json_data = json.loads(match.group(1))

return FlightsAPIResult.parse(json_data)
elif parse_method == 'html':
# create safe parser for selectolax
class _blank:
def text(self, *_, **__):
return ""
def iter(self):
return []
blank = _blank()
def safe(n: Optional[LexborNode]):
return n or blank

parser = LexborHTMLParser(r.text)
flights = []

for i, fl in enumerate(parser.css('div[jsname="IWWDBc"], div[jsname="YdtKid"]')):
is_best_flight = i == 0
for item in fl.css("ul.Rk10dc li")[
: (-1 if not dangerously_allow_looping_last_item else None)
]:
# Flight name
name = safe(item.css_first("div.sSHqwe.tPgKwe.ogfYpf span")).text(
strip=True
)
# Get departure & arrival time
dp_ar_node = item.css("span.mv1WYe div")
try:
departure_time = dp_ar_node[0].text(strip=True)
arrival_time = dp_ar_node[1].text(strip=True)
except IndexError:
# sometimes this is not present
departure_time = ""
arrival_time = ""
# Get arrival time ahead
time_ahead = safe(item.css_first("span.bOzv6")).text()
# Get duration
duration = safe(item.css_first("li div.Ak5kof div")).text()

# Get flight stops
stops = safe(item.css_first(".BbR8Ec .ogfYpf")).text()
# Get delay
delay = safe(item.css_first(".GsCCve")).text() or None
# Get prices
price = safe(item.css_first(".YMlIz.FpEdX")).text() or "0"
# Stops formatting
try:
stops_fmt = 0 if stops == "Nonstop" else int(stops.split(" ", 1)[0])
except ValueError:
stops_fmt = "Unknown"
flights.append(
{
"is_best": is_best_flight,
"name": name,
"departure": " ".join(departure_time.split()),
"arrival": " ".join(arrival_time.split()),
"arrival_time_ahead": time_ahead,
"duration": duration,
"stops": stops_fmt,
"delay": delay,
"price": price.replace(",", ""),
}
)
# Get current price
current_price = safe(parser.css_first("span.gOatQ")).text()
return HTMLParsedResult(current_price=current_price, flights=[HTMLParsedFlight(**fl) for fl in flights]) # type: ignore
else:
raise NotImplementedError('Only js_data and html are accepted for parse_method')


@overload
def get_flights(tfs: TFSData, *, parse_method: Literal['js_data'], **kwargs) -> FlightsAPIResult: ...

@overload
def get_flights(tfs: TFSData, *, parse_method: Literal['html'], **kwargs) -> HTMLParsedResult: ...

def get_flights(
tfs: TFSData,
Expand All @@ -122,20 +143,28 @@ def get_flights(
language: Optional[str] = None,
cookies: Optional[dict] = None,
dangerously_allow_looping_last_item: bool = False,
parse_method: ParseMethod = 'js_data',
attempted: bool = False,
**kwargs: Any,
) -> Result:
**kwargs: Dict[str, Any],
) -> Union[FlightsAPIResult, HTMLParsedResult]:
rs = request_flights(tfs, currency=currency, language=language, **kwargs)
results = parse_response(
rs, dangerously_allow_looping_last_item=dangerously_allow_looping_last_item
rs,
dangerously_allow_looping_last_item=dangerously_allow_looping_last_item,
parse_method=parse_method,
)

if not results.flights:
flights = results.flights if parse_method == 'html' else [*results.best, *results.other] # type: ignore

if not flights:
if not attempted:
return get_flights(
tfs,
currency=currency,
language=language,
cookies=cookies,
dangerously_allow_looping_last_item=dangerously_allow_looping_last_item,
parse_method=parse_method,
attempted=True,
**kwargs,
)
Expand Down
Loading