-
Notifications
You must be signed in to change notification settings - Fork 0
/
ts_inspect.py
executable file
·103 lines (92 loc) · 3.41 KB
/
ts_inspect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python3
import argparse
import logging
from ts import *
import sys
class OmniSet(object):
def __contains__(self, x):
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("mpeg_ts_file", help="The file to read")
parser.add_argument(
"--show-ts", help="Output TS packets.", action="store_true",
default=False)
parser.add_argument(
"--show-pes", help="Output PES packets.", action="store_true",
default=False)
parser.add_argument(
"--show-pat", help="Output PAT sections.", action="store_true",
default=False)
parser.add_argument(
"--show-pmt", help="Output PMT sections.", action="store_true",
default=False)
parser.add_argument(
"--as-c-array", action="store_true", default=False,
help="Output sections as C arrays instead of pretty-printing.")
parser.add_argument(
"--filter", type=lambda x: list(map(int, x.split(","))),
default=OmniSet(),
help="Only show output for PIDs in this comma-separated list.")
parser.add_argument(
"--no-wait", help="Don't want for input after output",
action="store_true", default=False)
parser.add_argument(
"--verbose", "-v", action="store_true", default=False,
help="Enable verbose output.")
args = parser.parse_args()
logging.basicConfig(
format='%(levelname)s: %(message)s',
level=logging.DEBUG if args.verbose else logging.INFO)
def wait():
if args.no_wait:
pass
else:
input()
def output(o):
print(o)
if args.as_c_array:
print("uint8_t %s_bytes[] = {%s};" %
(type(o).__name__.lower(), ", ".join(map(str, o.bytes))))
pmt_pids = set()
pes_readers = {}
ts_reader = read_ts(args.mpeg_ts_file)
while True:
try:
ts_packet = next(ts_reader)
except StopIteration:
break
except Exception as e:
print("Error reading TS packet: %s" % e)
continue
if args.show_ts and ts_packet.pid in args.filter:
output(ts_packet)
wait()
if ts_packet.pid == ProgramAssociationTable.PID:
try:
pat = ProgramAssociationTable(ts_packet.payload)
if args.show_pat and ts_packet.pid in args.filter:
output(pat)
wait()
pmt_pids.update(pat.programs.values())
except Exception as e:
print("Error reading PAT: %s" % e)
elif ts_packet.pid in pmt_pids:
try:
pmt = ProgramMapTable(ts_packet.payload)
if args.show_pmt and ts_packet.pid in args.filter:
output(pmt)
wait()
for pid in pmt.streams:
if pid not in pes_readers:
pes_readers[pid] = PESReader()
except Exception as e:
print("Error reading PMT: %s" % e)
elif args.show_pes and ts_packet.pid in pes_readers:
try:
pes_packet = pes_readers[ts_packet.pid].add_ts_packet(ts_packet)
if pes_packet and ts_packet.pid in args.filter:
output(pes_packet)
wait()
except Exception as e:
print("Error reading PES packet: %s" % e)