]>
code.delx.au - youtube-cgi/blob - youtube.cgi
d1cb6667783a934428c49be0b01f8dcccfafd08c
18 MAX_MEMORY_BYTES
= 128 * 1024*1024
19 USER_AGENT
= "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"
36 class VideoUnavailable(Exception):
39 class NotYouTube(Exception):
42 def print_form(url
="", msg
=""):
43 script_url
= "https://%s%s" % (os
.environ
["HTTP_HOST"], os
.environ
["REQUEST_URI"])
44 sys
.stdout
.write("Content-Type: text/html\r\n\r\n")
49 <title>delx.net.au - YouTube Scraper</title>
50 <link rel="stylesheet" type="text/css" href="/style.css">
51 <style type="text/css">
61 <h1>delx.net.au - YouTube Scraper</h1>
63 <form action="" method="get">
64 <p>This page will let you easily download YouTube videos to watch offline. It
65 will automatically grab the highest quality version.</p>
66 <div><input type="text" name="url" value="{1}"/></div>
67 <div><input type="submit" value="Download!"/></div>
69 <p>Tip! Use this bookmarklet: <a href="javascript:(function(){window.location='{2}?url='+escape(location);})()">YouTube Download</a>
70 to easily download videos. Right-click the link and add it to bookmarks,
71 then when you're looking at a YouTube page select that bookmark from your
72 browser's bookmarks menu to download the video straight away.</p>
75 """.replace("{0}", msg
).replace("{1}", url
).replace("{2}", script_url
))
77 cookiejar
= http
.cookiejar
.CookieJar()
78 urlopener
= urllib
.request
.build_opener(urllib
.request
.HTTPCookieProcessor(cookiejar
))
81 def urlopen(url
, offset
=None):
82 if url
.startswith("//"):
84 if not url
.startswith("http://") and not url
.startswith("https://"):
85 url
= "https://www.youtube.com" + url
88 req
= urllib
.request
.Request(url
)
92 req
.add_header("Referer", referrer
)
94 req
.add_header("User-Agent", USER_AGENT
)
97 req
.add_header("Range", "bytes=%d-" % offset
)
99 res
= urlopener
.open(req
)
101 content_range
= res
.getheader("Content-Range")
103 tokens
= content_range
.split()
104 assert tokens
[0] == "bytes"
105 start
= int(tokens
[1].split("-")[0])
106 assert start
== offset
109 def validate_url(url
):
110 parsed_url
= urllib
.parse
.urlparse(url
)
111 scheme_ok
= parsed_url
.scheme
== "https"
112 host_ok
= parsed_url
.netloc
.lstrip("www.") in ["youtube.com", "youtu.be"]
114 if scheme_ok
and host_ok
:
119 def parse_url(url
, parser
):
121 parser
.feed(f
.read().decode("utf-8"))
125 def append_to_qs(url
, params
):
126 r
= list(urllib
.parse
.urlsplit(url
))
127 qs
= urllib
.parse
.parse_qs(r
[3])
129 r
[3] = urllib
.parse
.urlencode(qs
, True)
130 url
= urllib
.parse
.urlunsplit(r
)
133 def get_player_config(scripts
):
135 for script
in scripts
:
136 for line
in script
.split("\n"):
137 s
= "ytplayer.config = {"
139 p1
= line
.find(s
) + len(s
) - 1
140 p2
= line
.find("};", p1
) + 1
141 if p1
>= 0 and p2
> 0:
142 return json
.loads(line
[p1
:p2
])
144 def extract_js(script
):
145 PREFIX
= "var _yt_player={};(function(g){var window=this;"
146 SUFFIX
= ";})(_yt_player);\n"
147 assert script
.startswith(PREFIX
)
148 assert script
.endswith(SUFFIX
)
150 return script
[len(PREFIX
):-len(SUFFIX
)]
152 def find_func_name(script
):
153 FUNC_NAME
= R
"([a-zA-Z0-9$]+)"
154 FUNC_PARAMS
= R
"(\([a-zA-Z,\.]+\.s\))"
155 TERMINATOR
= R
"[,;\)]"
156 PATTERN
= FUNC_NAME
+ FUNC_PARAMS
+ TERMINATOR
158 match
= re
.search(PATTERN
, script
)
159 func_name
= match
.groups()[0]
162 def decode_signature(js_url
, signature
):
164 script
= f
.read().decode("utf-8")
167 func_name
= find_func_name(script
)
170 "func_name": func_name
,
171 "signature": json
.dumps(signature
),
172 "code": json
.dumps(extract_js(script
)),
174 p
= subprocess
.Popen(
178 stdin
=subprocess
.PIPE
,
179 stdout
=subprocess
.PIPE
181 js_decode_script
= ("""
182 const vm = require('vm');
191 pushState: function(){}
197 XMLHttpRequest: class XMLHttpRequest {},
198 matchMedia: () => ({matches: () => {}, media: ''}),
199 signature: %(signature)s,
200 transformed_signature: null,
201 g: function(){} // this is _yt_player
203 sandbox.window = sandbox;
205 const code_string = %(code)s + ';';
206 const exec_string = 'transformed_signature = %(func_name)s("", "MARKER", signature);';
207 vm.runInNewContext(code_string + exec_string, sandbox);
209 function findSignature(obj) {
210 if (typeof obj !== 'object') {
213 for (const [key, value] of Object.entries(obj)) {
214 if (key === 'MARKER') {
217 const result = findSignature(value);
223 console.log(findSignature(sandbox.transformed_signature));
226 p
.stdin
.write(js_decode_script
.encode("utf-8"))
229 transformed_signature
= p
.stdout
.read().decode("utf-8").strip()
231 raise Exception("js failed to execute: %d" % p
.returncode
)
233 return transformed_signature
235 def get_best_video(player_config
):
236 url_data_list
= player_config
["args"]["url_encoded_fmt_stream_map"].split(",")
237 js_url
= player_config
["assets"]["js"]
241 best_extension
= None
242 for url_data
in url_data_list
:
243 url_data
= urllib
.parse
.parse_qs(url_data
)
244 mimetype
= url_data
["type"][0].split(";")[0]
245 quality
= url_data
["quality"][0]
247 if "stereo3d" in url_data
:
249 if quality
not in QUALITIES
:
251 if mimetype
not in MIMETYPES
:
254 extension
= MIMETYPES
[mimetype
]
255 quality
= QUALITIES
.get(quality
, -1)
257 if best_quality
is not None and quality
< best_quality
:
260 video_url
= url_data
["url"][0]
261 if "sig" in url_data
:
262 signature
= url_data
["sig"][0]
263 elif "s" in url_data
:
264 signature
= decode_signature(js_url
, url_data
["s"][0])
269 video_url
= append_to_qs(video_url
, {"signature": signature
})
272 best_quality
= quality
273 best_extension
= extension
275 return best_url
, best_extension
277 def sanitize_filename(filename
):
279 re
.sub("\s+", " ", filename
.strip())
285 def get_video_url(page
):
286 player_config
= get_player_config(page
.scripts
)
287 if not player_config
:
288 raise VideoUnavailable(page
.unavailable_message
or "Could not find video URL")
290 video_url
, extension
= get_best_video(player_config
)
294 filename
= sanitize_filename(page
.title
)
295 filename
+= "." + extension
297 return video_url
, filename
299 class YouTubeVideoPageParser(html
.parser
.HTMLParser
):
303 self
.unavailable_message
= None
306 def handle_starttag(self
, tag
, attrs
):
308 self
._handle
_title
(tag
, attrs
)
309 self
._handle
_unavailable
_message
(tag
, attrs
)
310 self
._handle
_script
(tag
, attrs
)
312 def handle_endtag(self
, tag
):
313 self
.handle_data
= self
._ignore
_data
315 def _ignore_data(self
, _
):
318 def _handle_title(self
, tag
, attrs
):
320 self
.handle_data
= self
._handle
_title
_data
322 def _handle_title_data(self
, data
):
323 self
.title
= data
.strip()
325 def _handle_unavailable_message(self
, tag
, attrs
):
326 if attrs
.get("id", None) == "unavailable-message":
327 self
.handle_data
= self
._handle
_unavailable
_message
_data
329 def _handle_unavailable_message_data(self
, data
):
330 self
.unavailable_message
= data
.strip()
332 def _handle_script(self
, tag
, attrs
):
334 self
.handle_data
= self
._handle
_script
_data
336 def _handle_script_data(self
, data
):
338 self
.scripts
.append(data
)
340 def write_video(filename
, video_data
):
341 quoted_filename
= urllib
.parse
.quote(filename
.encode("utf-8"))
342 sys
.stdout
.buffer.write(
343 b
"Content-Disposition: attachment; filename*=UTF-8''{0}\r\n"
344 .replace(b
"{0}", quoted_filename
.encode("utf-8"))
346 sys
.stdout
.buffer.write(
347 b
"Content-Length: {0}\r\n"
348 .replace(b
"{0}", video_data
.getheader("Content-Length").encode("utf-8"))
350 sys
.stdout
.buffer.write(b
"\r\n")
351 shutil
.copyfileobj(video_data
, sys
.stdout
.buffer)
359 print_form(url
="https://www.youtube.com/watch?v=FOOBAR")
363 page
= YouTubeVideoPageParser()
366 video_url
, filename
= get_video_url(page
)
367 video_data
= urlopen(video_url
)
368 except VideoUnavailable
as e
:
371 msg
="<p class='error'>Sorry, there was an error: %s</p>" % cgi
.escape(e
.args
[0])
376 msg
="<p class='error'>Sorry, that does not look like a YouTube page!</p>"
378 except Exception as e
:
381 msg
="<p class='error'>Sorry, there was an unknown error.</p>"
385 write_video(filename
, video_data
)
388 suffixes
= ["", "KiB", "MiB", "GiB"]
389 for i
, suffix
in enumerate(suffixes
):
393 return "%.2f %s" % (size
, suffix
)
395 def copy_with_progress(content_length
, infile
, outfile
):
399 rate
= last_bytes_read
/ (now
- last_ts
)
400 sys
.stdout
.write("\33[2K\r")
401 sys
.stdout
.write("%s / %s (%s/sec)" % (
403 pp_size(content_length
),
413 if now
- last_ts
> 0.5:
418 buf
= infile
.read(32768)
422 last_bytes_read
+= len(buf
)
423 bytes_read
+= len(buf
)
433 print("Usage: %s https://youtube.com/watch?v=FOOBAR" % sys
.argv
[0], file=sys
.stderr
)
436 page
= YouTubeVideoPageParser()
438 video_url
, filename
= get_video_url(page
)
439 print("Downloading", filename
)
441 outfile
= open(filename
, "ab")
442 offset
= outfile
.tell()
444 print("Resuming download from", pp_size(offset
))
449 video_data
= urlopen(video_url
, offset
)
450 except urllib
.error
.HTTPError
as e
:
452 print("File is complete!")
457 content_length
= int(video_data
.getheader("Content-Length"))
458 if total_size
is None:
459 total_size
= content_length
462 copy_with_progress(content_length
, video_data
, outfile
)
467 if outfile
.tell() != total_size
:
469 offset
= outfile
.tell()
470 if old_offset
== offset
:
472 print("Restarting download from", pp_size(offset
))
479 if __name__
== "__main__":
480 if "SCRIPT_NAME" in os
.environ
:
485 except KeyboardInterrupt:
486 print("\nExiting...")