]> code.delx.au - youtube-cgi/blob - youtube.cgi
d1cb6667783a934428c49be0b01f8dcccfafd08c
[youtube-cgi] / youtube.cgi
1 #!/usr/bin/env python3
2
3 import cgi
4 import html.parser
5 import http.cookiejar
6 import json
7 import os
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import time
13 import urllib.error
14 import urllib.parse
15 import urllib.request
16
17
18 MAX_MEMORY_BYTES = 128 * 1024*1024
19 USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"
20
21 MIMETYPES = {
22 "video/mp4": "mp4",
23 "video/x-flv": "flv",
24 "video/3gpp": "3gp",
25 }
26
27 QUALITIES = {
28 "hd1080": 5,
29 "hd720": 4,
30 "large": 3,
31 "medium": 2,
32 "small": 1,
33 }
34
35
36 class VideoUnavailable(Exception):
37 pass
38
39 class NotYouTube(Exception):
40 pass
41
42 def print_form(url="", msg=""):
43 script_url = "https://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
44 sys.stdout.write("Content-Type: text/html\r\n\r\n")
45 sys.stdout.write("""
46 <!DOCTYPE html>
47 <html>
48 <head>
49 <title>delx.net.au - YouTube Scraper</title>
50 <link rel="stylesheet" type="text/css" href="/style.css">
51 <style type="text/css">
52 input[type="text"] {
53 width: 100%;
54 }
55 .error {
56 color: red;
57 }
58 </style>
59 </head>
60 <body>
61 <h1>delx.net.au - YouTube Scraper</h1>
62 {0}
63 <form action="" method="get">
64 <p>This page will let you easily download YouTube videos to watch offline. It
65 will automatically grab the highest quality version.</p>
66 <div><input type="text" name="url" value="{1}"/></div>
67 <div><input type="submit" value="Download!"/></div>
68 </form>
69 <p>Tip! Use this bookmarklet: <a href="javascript:(function(){window.location='{2}?url='+escape(location);})()">YouTube Download</a>
70 to easily download videos. Right-click the link and add it to bookmarks,
71 then when you're looking at a YouTube page select that bookmark from your
72 browser's bookmarks menu to download the video straight away.</p>
73 </body>
74 </html>
75 """.replace("{0}", msg).replace("{1}", url).replace("{2}", script_url))
76
77 cookiejar = http.cookiejar.CookieJar()
78 urlopener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar))
79 referrer = ""
80
81 def urlopen(url, offset=None):
82 if url.startswith("//"):
83 url = "https:" + url
84 if not url.startswith("http://") and not url.startswith("https://"):
85 url = "https://www.youtube.com" + url
86
87 global referrer
88 req = urllib.request.Request(url)
89 if not referrer:
90 referrer = url
91 else:
92 req.add_header("Referer", referrer)
93
94 req.add_header("User-Agent", USER_AGENT)
95
96 if offset:
97 req.add_header("Range", "bytes=%d-" % offset)
98
99 res = urlopener.open(req)
100
101 content_range = res.getheader("Content-Range")
102 if content_range:
103 tokens = content_range.split()
104 assert tokens[0] == "bytes"
105 start = int(tokens[1].split("-")[0])
106 assert start == offset
107 return res
108
109 def validate_url(url):
110 parsed_url = urllib.parse.urlparse(url)
111 scheme_ok = parsed_url.scheme == "https"
112 host_ok = parsed_url.netloc.lstrip("www.") in ["youtube.com", "youtu.be"]
113
114 if scheme_ok and host_ok:
115 return
116 else:
117 raise NotYouTube()
118
119 def parse_url(url, parser):
120 f = urlopen(url)
121 parser.feed(f.read().decode("utf-8"))
122 parser.close()
123 f.close()
124
125 def append_to_qs(url, params):
126 r = list(urllib.parse.urlsplit(url))
127 qs = urllib.parse.parse_qs(r[3])
128 qs.update(params)
129 r[3] = urllib.parse.urlencode(qs, True)
130 url = urllib.parse.urlunsplit(r)
131 return url
132
133 def get_player_config(scripts):
134 player_config = None
135 for script in scripts:
136 for line in script.split("\n"):
137 s = "ytplayer.config = {"
138 if s in line:
139 p1 = line.find(s) + len(s) - 1
140 p2 = line.find("};", p1) + 1
141 if p1 >= 0 and p2 > 0:
142 return json.loads(line[p1:p2])
143
144 def extract_js(script):
145 PREFIX = "var _yt_player={};(function(g){var window=this;"
146 SUFFIX = ";})(_yt_player);\n"
147 assert script.startswith(PREFIX)
148 assert script.endswith(SUFFIX)
149
150 return script[len(PREFIX):-len(SUFFIX)]
151
152 def find_func_name(script):
153 FUNC_NAME = R"([a-zA-Z0-9$]+)"
154 FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))"
155 TERMINATOR = R"[,;\)]"
156 PATTERN = FUNC_NAME + FUNC_PARAMS + TERMINATOR
157
158 match = re.search(PATTERN, script)
159 func_name = match.groups()[0]
160 return func_name
161
162 def decode_signature(js_url, signature):
163 f = urlopen(js_url)
164 script = f.read().decode("utf-8")
165 f.close()
166
167 func_name = find_func_name(script)
168
169 params = {
170 "func_name": func_name,
171 "signature": json.dumps(signature),
172 "code": json.dumps(extract_js(script)),
173 }
174 p = subprocess.Popen(
175 "node",
176 shell=True,
177 close_fds=True,
178 stdin=subprocess.PIPE,
179 stdout=subprocess.PIPE
180 )
181 js_decode_script = ("""
182 const vm = require('vm');
183
184 const sandbox = {
185 location: {
186 hash: '',
187 href: '',
188 protocol: 'http:'
189 },
190 history: {
191 pushState: function(){}
192 },
193 document: {},
194 navigator: {
195 userAgent: ''
196 },
197 XMLHttpRequest: class XMLHttpRequest {},
198 matchMedia: () => ({matches: () => {}, media: ''}),
199 signature: %(signature)s,
200 transformed_signature: null,
201 g: function(){} // this is _yt_player
202 };
203 sandbox.window = sandbox;
204
205 const code_string = %(code)s + ';';
206 const exec_string = 'transformed_signature = %(func_name)s("", "MARKER", signature);';
207 vm.runInNewContext(code_string + exec_string, sandbox);
208
209 function findSignature(obj) {
210 if (typeof obj !== 'object') {
211 return;
212 }
213 for (const [key, value] of Object.entries(obj)) {
214 if (key === 'MARKER') {
215 return value;
216 }
217 const result = findSignature(value);
218 if (result) {
219 return result;
220 }
221 }
222 }
223 console.log(findSignature(sandbox.transformed_signature));
224 """ % params)
225
226 p.stdin.write(js_decode_script.encode("utf-8"))
227 p.stdin.close()
228
229 transformed_signature = p.stdout.read().decode("utf-8").strip()
230 if p.wait() != 0:
231 raise Exception("js failed to execute: %d" % p.returncode)
232
233 return transformed_signature
234
235 def get_best_video(player_config):
236 url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",")
237 js_url = player_config["assets"]["js"]
238
239 best_url = None
240 best_quality = None
241 best_extension = None
242 for url_data in url_data_list:
243 url_data = urllib.parse.parse_qs(url_data)
244 mimetype = url_data["type"][0].split(";")[0]
245 quality = url_data["quality"][0]
246
247 if "stereo3d" in url_data:
248 continue
249 if quality not in QUALITIES:
250 continue
251 if mimetype not in MIMETYPES:
252 continue
253
254 extension = MIMETYPES[mimetype]
255 quality = QUALITIES.get(quality, -1)
256
257 if best_quality is not None and quality < best_quality:
258 continue
259
260 video_url = url_data["url"][0]
261 if "sig" in url_data:
262 signature = url_data["sig"][0]
263 elif "s" in url_data:
264 signature = decode_signature(js_url, url_data["s"][0])
265 else:
266 signature = None
267
268 if signature:
269 video_url = append_to_qs(video_url, {"signature": signature})
270
271 best_url = video_url
272 best_quality = quality
273 best_extension = extension
274
275 return best_url, best_extension
276
277 def sanitize_filename(filename):
278 return (
279 re.sub("\s+", " ", filename.strip())
280 .replace("\\", "-")
281 .replace("/", "-")
282 .replace("\0", " ")
283 )
284
285 def get_video_url(page):
286 player_config = get_player_config(page.scripts)
287 if not player_config:
288 raise VideoUnavailable(page.unavailable_message or "Could not find video URL")
289
290 video_url, extension = get_best_video(player_config)
291 if not video_url:
292 return None, None
293
294 filename = sanitize_filename(page.title)
295 filename += "." + extension
296
297 return video_url, filename
298
299 class YouTubeVideoPageParser(html.parser.HTMLParser):
300 def __init__(self):
301 super().__init__()
302 self.title = None
303 self.unavailable_message = None
304 self.scripts = []
305
306 def handle_starttag(self, tag, attrs):
307 attrs = dict(attrs)
308 self._handle_title(tag, attrs)
309 self._handle_unavailable_message(tag, attrs)
310 self._handle_script(tag, attrs)
311
312 def handle_endtag(self, tag):
313 self.handle_data = self._ignore_data
314
315 def _ignore_data(self, _):
316 pass
317
318 def _handle_title(self, tag, attrs):
319 if tag == "title":
320 self.handle_data = self._handle_title_data
321
322 def _handle_title_data(self, data):
323 self.title = data.strip()
324
325 def _handle_unavailable_message(self, tag, attrs):
326 if attrs.get("id", None) == "unavailable-message":
327 self.handle_data = self._handle_unavailable_message_data
328
329 def _handle_unavailable_message_data(self, data):
330 self.unavailable_message = data.strip()
331
332 def _handle_script(self, tag, attrs):
333 if tag == "script":
334 self.handle_data = self._handle_script_data
335
336 def _handle_script_data(self, data):
337 if data:
338 self.scripts.append(data)
339
340 def write_video(filename, video_data):
341 quoted_filename = urllib.parse.quote(filename.encode("utf-8"))
342 sys.stdout.buffer.write(
343 b"Content-Disposition: attachment; filename*=UTF-8''{0}\r\n"
344 .replace(b"{0}", quoted_filename.encode("utf-8"))
345 )
346 sys.stdout.buffer.write(
347 b"Content-Length: {0}\r\n"
348 .replace(b"{0}", video_data.getheader("Content-Length").encode("utf-8"))
349 )
350 sys.stdout.buffer.write(b"\r\n")
351 shutil.copyfileobj(video_data, sys.stdout.buffer)
352 video_data.close()
353
354 def cgimain():
355 args = cgi.parse()
356 try:
357 url = args["url"][0]
358 except:
359 print_form(url="https://www.youtube.com/watch?v=FOOBAR")
360 return
361
362 try:
363 page = YouTubeVideoPageParser()
364 validate_url(url)
365 parse_url(url, page)
366 video_url, filename = get_video_url(page)
367 video_data = urlopen(video_url)
368 except VideoUnavailable as e:
369 print_form(
370 url=url,
371 msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.args[0])
372 )
373 except NotYouTube:
374 print_form(
375 url=url,
376 msg="<p class='error'>Sorry, that does not look like a YouTube page!</p>"
377 )
378 except Exception as e:
379 print_form(
380 url=url,
381 msg="<p class='error'>Sorry, there was an unknown error.</p>"
382 )
383 return
384
385 write_video(filename, video_data)
386
387 def pp_size(size):
388 suffixes = ["", "KiB", "MiB", "GiB"]
389 for i, suffix in enumerate(suffixes):
390 if size < 1024:
391 break
392 size /= 1024
393 return "%.2f %s" % (size, suffix)
394
395 def copy_with_progress(content_length, infile, outfile):
396 def print_status():
397 rate = 0
398 if now != last_ts:
399 rate = last_bytes_read / (now - last_ts)
400 sys.stdout.write("\33[2K\r")
401 sys.stdout.write("%s / %s (%s/sec)" % (
402 pp_size(bytes_read),
403 pp_size(content_length),
404 pp_size(rate),
405 ))
406 sys.stdout.flush()
407
408 last_ts = 0
409 last_bytes_read = 0
410 bytes_read = 0
411 while True:
412 now = time.time()
413 if now - last_ts > 0.5:
414 print_status()
415 last_ts = now
416 last_bytes_read = 0
417
418 buf = infile.read(32768)
419 if not buf:
420 break
421 outfile.write(buf)
422 last_bytes_read += len(buf)
423 bytes_read += len(buf)
424
425 # Newline at the end
426 print_status()
427 print()
428
429 def main():
430 try:
431 url = sys.argv[1]
432 except:
433 print("Usage: %s https://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr)
434 sys.exit(1)
435
436 page = YouTubeVideoPageParser()
437 parse_url(url, page)
438 video_url, filename = get_video_url(page)
439 print("Downloading", filename)
440
441 outfile = open(filename, "ab")
442 offset = outfile.tell()
443 if offset > 0:
444 print("Resuming download from", pp_size(offset))
445 total_size = None
446
447 while True:
448 try:
449 video_data = urlopen(video_url, offset)
450 except urllib.error.HTTPError as e:
451 if e.code == 416:
452 print("File is complete!")
453 break
454 else:
455 raise
456
457 content_length = int(video_data.getheader("Content-Length"))
458 if total_size is None:
459 total_size = content_length
460
461 try:
462 copy_with_progress(content_length, video_data, outfile)
463 except IOError as e:
464 print()
465
466 video_data.close()
467 if outfile.tell() != total_size:
468 old_offset = offset
469 offset = outfile.tell()
470 if old_offset == offset:
471 time.sleep(1)
472 print("Restarting download from", pp_size(offset))
473 else:
474 break
475
476 outfile.close()
477
478
479 if __name__ == "__main__":
480 if "SCRIPT_NAME" in os.environ:
481 cgimain()
482 else:
483 try:
484 main()
485 except KeyboardInterrupt:
486 print("\nExiting...")
487 sys.exit(1)
488