-#!/usr/bin/env python
+#!/usr/bin/env python3
-import cookielib
import cgi
-import itertools
+import html.parser
+import http.cookiejar
import json
-from lxml import html
import os
import re
-import resource
import shutil
import subprocess
import sys
-import urllib
-import urllib2
-import urlparse
+import time
+import urllib.error
+import urllib.parse
+import urllib.request
MAX_MEMORY_BYTES = 128 * 1024*1024
USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"
MIMETYPES = {
- "video/mp4": "mp4",
- "video/x-flv": "flv",
- "video/3gpp": "3gp",
+ "video/mp4": "mp4",
+ "video/x-flv": "flv",
+ "video/3gpp": "3gp",
}
QUALITIES = {
- "large": 3,
- "medium": 2,
- "small": 1,
+ "hd1080": 5,
+ "hd720": 4,
+ "large": 3,
+ "medium": 2,
+ "small": 1,
}
class VideoUnavailable(Exception):
- pass
+ pass
+
+class NotYouTube(Exception):
+ pass
def print_form(url="", msg=""):
- script_url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
- sys.stdout.write("Content-Type: application/xhtml+xml\r\n\r\n")
- sys.stdout.write("""
-<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
-<html xmlns="http://www.w3.org/1999/xhtml">
+ script_url = "https://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
+ sys.stdout.write("Content-Type: text/html\r\n\r\n")
+ sys.stdout.write("""
+<!DOCTYPE html>
+<html>
<head>
- <title>delx.net.au - YouTube Scraper</title>
- <link rel="stylesheet" type="text/css" href="/style.css"/>
- <style type="text/css">
- input[type="text"] {
- width: 100%;
- }
- .error {
- color: red;
- }
- </style>
+ <title>delx.net.au - YouTube Scraper</title>
+ <link rel="stylesheet" type="text/css" href="/style.css">
+ <style type="text/css">
+ input[type="text"] {
+ width: 100%;
+ }
+ .error {
+ color: red;
+ }
+ </style>
</head>
<body>
- <h1>delx.net.au - YouTube Scraper</h1>
- {0}
- <form action="" method="get">
- <p>This page will let you easily download YouTube videos to watch offline. It
- will automatically grab the highest quality version.</p>
- <div><input type="text" name="url" value="{1}"/></div>
- <div><input type="submit" value="Download!"/></div>
- </form>
- <p>Tip! Use this bookmarklet: <a href="javascript:(function(){window.location='{2}?url='+escape(location);})()">YouTube Download</a>
- to easily download videos. Right-click the link and add it to bookmarks,
- then when you're looking at a YouTube page select that bookmark from your
- browser's bookmarks menu to download the video straight away.</p>
+ <h1>delx.net.au - YouTube Scraper</h1>
+ {0}
+ <form action="" method="get">
+ <p>This page will let you easily download YouTube videos to watch offline. It
+ will automatically grab the highest quality version.</p>
+ <div><input type="text" name="url" value="{1}"/></div>
+ <div><input type="submit" value="Download!"/></div>
+ </form>
+ <p>Tip! Use this bookmarklet: <a href="javascript:(function(){window.location='{2}?url='+escape(location);})()">YouTube Download</a>
+ to easily download videos. Right-click the link and add it to bookmarks,
+ then when you're looking at a YouTube page select that bookmark from your
+ browser's bookmarks menu to download the video straight away.</p>
</body>
</html>
""".replace("{0}", msg).replace("{1}", url).replace("{2}", script_url))
-cookiejar = cookielib.CookieJar()
-urlopener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookiejar))
+cookiejar = http.cookiejar.CookieJar()
+urlopener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar))
referrer = ""
-def urlopen(url):
- global referrer
- req = urllib2.Request(url)
- if referrer:
- req.add_header("Referer", referrer)
- referrer = url
- req.add_header("User-Agent", USER_AGENT)
- return urlopener.open(req)
-
-def parse_url(url):
- f = urlopen(url)
- doc = html.parse(f, html.HTMLParser(encoding="utf-8", recover=True))
- f.close()
- return doc
+def urlopen(url, offset=None):
+ if url.startswith("//"):
+ url = "https:" + url
+ if not url.startswith("http://") and not url.startswith("https://"):
+ url = "https://www.youtube.com" + url
+
+ global referrer
+ req = urllib.request.Request(url)
+ if not referrer:
+ referrer = url
+ else:
+ req.add_header("Referer", referrer)
+
+ req.add_header("User-Agent", USER_AGENT)
+
+ if offset:
+ req.add_header("Range", "bytes=%d-" % offset)
+
+ res = urlopener.open(req)
+
+ content_range = res.getheader("Content-Range")
+ if content_range:
+ tokens = content_range.split()
+ assert tokens[0] == "bytes"
+ start = int(tokens[1].split("-")[0])
+ assert start == offset
+ return res
+
+def validate_url(url):
+ parsed_url = urllib.parse.urlparse(url)
+ scheme_ok = parsed_url.scheme == "https"
+ host_ok = parsed_url.netloc.lstrip("www.") in ["youtube.com", "youtu.be"]
+
+ if scheme_ok and host_ok:
+ return
+ else:
+ raise NotYouTube()
+
+def parse_url(url, parser):
+ f = urlopen(url)
+ parser.feed(f.read().decode("utf-8"))
+ parser.close()
+ f.close()
def append_to_qs(url, params):
- r = list(urlparse.urlsplit(url))
- qs = urlparse.parse_qs(r[3])
- qs.update(params)
- r[3] = urllib.urlencode(qs, True)
- url = urlparse.urlunsplit(r)
- return url
-
-def convert_from_old_itag(player_config):
- url_data = urlparse.parse_qs(player_config["args"]["url_encoded_fmt_stream_map"])
- url_data["url"] = []
- for itag_url in url_data["itag"]:
- pos = itag_url.find("url=")
- url_data["url"].append(itag_url[pos+4:])
- player_config["args"]["url_encoded_fmt_stream_map"] = urllib.urlencode(url_data, True)
-
-def get_player_config(doc):
- player_config = None
- for script in doc.xpath("//script"):
- if not script.text:
- continue
- for line in script.text.split("\n"):
- if "yt.playerConfig =" in line:
- p1 = line.find("=")
- p2 = line.rfind(";")
- if p1 >= 0 and p2 > 0:
- return json.loads(line[p1+1:p2])
- if "'PLAYER_CONFIG': " in line:
- p1 = line.find(":")
- if p1 >= 0:
- player_config = json.loads(line[p1+1:])
- convert_from_old_itag(player_config)
- return player_config
+ r = list(urllib.parse.urlsplit(url))
+ qs = urllib.parse.parse_qs(r[3])
+ qs.update(params)
+ r[3] = urllib.parse.urlencode(qs, True)
+ url = urllib.parse.urlunsplit(r)
+ return url
+
+def get_player_config(scripts):
+ player_config = None
+ for script in scripts:
+ for line in script.split("\n"):
+ s = "ytplayer.config = {"
+ if s in line:
+ p1 = line.find(s) + len(s) - 1
+ p2 = line.find("};", p1) + 1
+ if p1 >= 0 and p2 > 0:
+ return json.loads(line[p1:p2])
+
+def extract_js(script):
+ PREFIX = "var _yt_player={};(function(g){var window=this;"
+ SUFFIX = ";})(_yt_player);\n"
+ assert script.startswith(PREFIX)
+ assert script.endswith(SUFFIX)
+
+ return script[len(PREFIX):-len(SUFFIX)]
+
+def find_func_name(script):
+ FUNC_NAME = R"([a-zA-Z0-9$]+)"
+ FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))"
+ TERMINATOR = R"[,;\)]"
+ PATTERN = FUNC_NAME + FUNC_PARAMS + TERMINATOR
+
+ match = re.search(PATTERN, script)
+ func_name = match.groups()[0]
+ return func_name
+
+def decode_signature(js_url, signature):
+ f = urlopen(js_url)
+ script = f.read().decode("utf-8")
+ f.close()
+
+ func_name = find_func_name(script)
+
+ params = {
+ "func_name": func_name,
+ "signature": json.dumps(signature),
+ "code": json.dumps(extract_js(script)),
+ }
+ p = subprocess.Popen(
+ "node",
+ shell=True,
+ close_fds=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE
+ )
+ js_decode_script = ("""
+ const vm = require('vm');
+
+ const sandbox = {
+ location: {
+ hash: '',
+ href: '',
+ protocol: 'http:'
+ },
+ history: {
+ pushState: function(){}
+ },
+ document: {},
+ navigator: {
+ userAgent: ''
+ },
+ XMLHttpRequest: class XMLHttpRequest {},
+ matchMedia: () => ({matches: () => {}, media: ''}),
+ signature: %(signature)s,
+ transformed_signature: null,
+ g: function(){} // this is _yt_player
+ };
+ sandbox.window = sandbox;
+
+ const code_string = %(code)s + ';';
+ const exec_string = 'transformed_signature = %(func_name)s("", "MARKER", signature);';
+ vm.runInNewContext(code_string + exec_string, sandbox);
+
+ function findSignature(obj) {
+ if (typeof obj !== 'object') {
+ return;
+ }
+ for (const [key, value] of Object.entries(obj)) {
+ if (key === 'MARKER') {
+ return value;
+ }
+ const result = findSignature(value);
+ if (result) {
+ return result;
+ }
+ }
+ }
+ console.log(findSignature(sandbox.transformed_signature));
+ """ % params)
+
+ p.stdin.write(js_decode_script.encode("utf-8"))
+ p.stdin.close()
+
+ transformed_signature = p.stdout.read().decode("utf-8").strip()
+ if p.wait() != 0:
+ raise Exception("js failed to execute: %d" % p.returncode)
+
+ return transformed_signature
def get_best_video(player_config):
- url_data = urlparse.parse_qs(player_config["args"]["url_encoded_fmt_stream_map"])
- url_data = itertools.izip_longest(
- url_data["url"],
- url_data["type"],
- url_data["quality"],
- url_data.get("sig", []),
- )
- best_url = None
- best_quality = None
- best_extension = None
- for video_url, mimetype, quality, signature in url_data:
- mimetype = mimetype.split(";")[0]
- if mimetype not in MIMETYPES:
- continue
- extension = MIMETYPES[mimetype]
- quality = QUALITIES.get(quality.split(",")[0], -1)
- if best_quality is None or quality > best_quality:
- if signature:
- video_url = append_to_qs(video_url, {"signature": signature})
- best_url = video_url
- best_quality = quality
- best_extension = extension
-
- return best_url, best_extension
+ url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",")
+ js_url = player_config["assets"]["js"]
+
+ best_url = None
+ best_quality = None
+ best_extension = None
+ for url_data in url_data_list:
+ url_data = urllib.parse.parse_qs(url_data)
+ mimetype = url_data["type"][0].split(";")[0]
+ quality = url_data["quality"][0]
+
+ if "stereo3d" in url_data:
+ continue
+ if quality not in QUALITIES:
+ continue
+ if mimetype not in MIMETYPES:
+ continue
+
+ extension = MIMETYPES[mimetype]
+ quality = QUALITIES.get(quality, -1)
+
+ if best_quality is not None and quality < best_quality:
+ continue
+
+ video_url = url_data["url"][0]
+ if "sig" in url_data:
+ signature = url_data["sig"][0]
+ elif "s" in url_data:
+ signature = decode_signature(js_url, url_data["s"][0])
+ else:
+ signature = None
+
+ if signature:
+ video_url = append_to_qs(video_url, {"signature": signature})
+
+ best_url = video_url
+ best_quality = quality
+ best_extension = extension
+
+ return best_url, best_extension
def sanitize_filename(filename):
- return (
- re.sub("\s+", " ", filename.strip())
- .replace("\\", "-")
- .replace("/", "-")
- .replace("\0", " ")
- )
+ return (
+ re.sub("\s+", " ", filename.strip())
+ .replace("\\", "-")
+ .replace("/", "-")
+ .replace("\0", " ")
+ )
+
+def get_video_url(page):
+ player_config = get_player_config(page.scripts)
+ if not player_config:
+ raise VideoUnavailable(page.unavailable_message or "Could not find video URL")
+
+ video_url, extension = get_best_video(player_config)
+ if not video_url:
+ return None, None
+
+ filename = sanitize_filename(page.title)
+ filename += "." + extension
-def get_video_url(doc):
- unavailable = doc.xpath("//div[@id='unavailable-message']/text()")
- if unavailable:
- raise VideoUnavailable(unavailable[0].strip())
+ return video_url, filename
- player_config = get_player_config(doc)
- if not player_config:
- raise VideoUnavailable("Could not find video URL")
+class YouTubeVideoPageParser(html.parser.HTMLParser):
+ def __init__(self):
+ super().__init__()
+ self.title = None
+ self.unavailable_message = None
+ self.scripts = []
- video_url, extension = get_best_video(player_config)
- if not video_url:
- return None, None
+ def handle_starttag(self, tag, attrs):
+ attrs = dict(attrs)
+ self._handle_title(tag, attrs)
+ self._handle_unavailable_message(tag, attrs)
+ self._handle_script(tag, attrs)
- title = doc.xpath("/html/head/title/text()")[0]
- filename = sanitize_filename(title)
- filename += "." + extension
+ def handle_endtag(self, tag):
+ self.handle_data = self._ignore_data
- return video_url, filename
+ def _ignore_data(self, _):
+ pass
+
+ def _handle_title(self, tag, attrs):
+ if tag == "title":
+ self.handle_data = self._handle_title_data
+
+ def _handle_title_data(self, data):
+ self.title = data.strip()
+
+ def _handle_unavailable_message(self, tag, attrs):
+ if attrs.get("id", None) == "unavailable-message":
+ self.handle_data = self._handle_unavailable_message_data
+
+ def _handle_unavailable_message_data(self, data):
+ self.unavailable_message = data.strip()
+
+ def _handle_script(self, tag, attrs):
+ if tag == "script":
+ self.handle_data = self._handle_script_data
+
+ def _handle_script_data(self, data):
+ if data:
+ self.scripts.append(data)
def write_video(filename, video_data):
- httpinfo = video_data.info()
- encoded_filename = urllib.quote(filename.encode("utf-8"))
- sys.stdout.write("Content-Disposition: attachment; filename*=UTF-8''%s\r\n" % encoded_filename)
- sys.stdout.write("Content-Length: %s\r\n" % httpinfo.getheader("Content-Length"))
- sys.stdout.write("\r\n")
- shutil.copyfileobj(video_data, sys.stdout)
- video_data.close()
+ quoted_filename = urllib.parse.quote(filename.encode("utf-8"))
+ sys.stdout.buffer.write(
+ b"Content-Disposition: attachment; filename*=UTF-8''{0}\r\n"
+ .replace(b"{0}", quoted_filename.encode("utf-8"))
+ )
+ sys.stdout.buffer.write(
+ b"Content-Length: {0}\r\n"
+ .replace(b"{0}", video_data.getheader("Content-Length").encode("utf-8"))
+ )
+ sys.stdout.buffer.write(b"\r\n")
+ shutil.copyfileobj(video_data, sys.stdout.buffer)
+ video_data.close()
def cgimain():
- args = cgi.parse()
- try:
- url = args["url"][0]
- except:
- print_form(url="http://www.youtube.com/watch?v=FOOBAR")
- return
-
- try:
- doc = parse_url(url)
- video_url, filename = get_video_url(doc)
- video_data = urlopen(video_url)
- write_video(filename, video_data)
- except VideoUnavailable, e:
- print_form(
- url=url,
- msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.message)
- )
- except Exception, e:
- print_form(
- url=url,
- msg="<p class='error'>Sorry, there was an error. Check your URL?</p>"
- )
- return
+ args = cgi.parse()
+ try:
+ url = args["url"][0]
+ except:
+ print_form(url="https://www.youtube.com/watch?v=FOOBAR")
+ return
+
+ try:
+ page = YouTubeVideoPageParser()
+ validate_url(url)
+ parse_url(url, page)
+ video_url, filename = get_video_url(page)
+ video_data = urlopen(video_url)
+ except VideoUnavailable as e:
+ print_form(
+ url=url,
+ msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.args[0])
+ )
+ except NotYouTube:
+ print_form(
+ url=url,
+ msg="<p class='error'>Sorry, that does not look like a YouTube page!</p>"
+ )
+ except Exception as e:
+ print_form(
+ url=url,
+ msg="<p class='error'>Sorry, there was an unknown error.</p>"
+ )
+ return
+
+ write_video(filename, video_data)
+
+def pp_size(size):
+ suffixes = ["", "KiB", "MiB", "GiB"]
+ for i, suffix in enumerate(suffixes):
+ if size < 1024:
+ break
+ size /= 1024
+ return "%.2f %s" % (size, suffix)
+
+def copy_with_progress(content_length, infile, outfile):
+ def print_status():
+ rate = 0
+ if now != last_ts:
+ rate = last_bytes_read / (now - last_ts)
+ sys.stdout.write("\33[2K\r")
+ sys.stdout.write("%s / %s (%s/sec)" % (
+ pp_size(bytes_read),
+ pp_size(content_length),
+ pp_size(rate),
+ ))
+ sys.stdout.flush()
+
+ last_ts = 0
+ last_bytes_read = 0
+ bytes_read = 0
+ while True:
+ now = time.time()
+ if now - last_ts > 0.5:
+ print_status()
+ last_ts = now
+ last_bytes_read = 0
+
+ buf = infile.read(32768)
+ if not buf:
+ break
+ outfile.write(buf)
+ last_bytes_read += len(buf)
+ bytes_read += len(buf)
+
+ # Newline at the end
+ print_status()
+ print()
def main():
- try:
- url = sys.argv[1]
- except:
- print >>sys.stderr, "Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0]
- sys.exit(1)
- doc = parse_url(url)
- video_url, filename = get_video_url(doc)
- data = urlopen(video_url)
- outfile = open(filename, "w")
- shutil.copyfileobj(data, outfile)
- data.close()
- outfile.close()
+ try:
+ url = sys.argv[1]
+ except:
+ print("Usage: %s https://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr)
+ sys.exit(1)
+
+ page = YouTubeVideoPageParser()
+ parse_url(url, page)
+ video_url, filename = get_video_url(page)
+ print("Downloading", filename)
+
+ outfile = open(filename, "ab")
+ offset = outfile.tell()
+ if offset > 0:
+ print("Resuming download from", pp_size(offset))
+ total_size = None
+
+ while True:
+ try:
+ video_data = urlopen(video_url, offset)
+ except urllib.error.HTTPError as e:
+ if e.code == 416:
+ print("File is complete!")
+ break
+ else:
+ raise
+
+ content_length = int(video_data.getheader("Content-Length"))
+ if total_size is None:
+ total_size = content_length
+
+ try:
+ copy_with_progress(content_length, video_data, outfile)
+ except IOError as e:
+ print()
+
+ video_data.close()
+ if outfile.tell() != total_size:
+ old_offset = offset
+ offset = outfile.tell()
+ if old_offset == offset:
+ time.sleep(1)
+ print("Restarting download from", pp_size(offset))
+ else:
+ break
+
+ outfile.close()
if __name__ == "__main__":
- resource.setrlimit(resource.RLIMIT_AS, (MAX_MEMORY_BYTES, MAX_MEMORY_BYTES))
- if os.environ.has_key("SCRIPT_NAME"):
- cgimain()
- else:
- main()
+ if "SCRIPT_NAME" in os.environ:
+ cgimain()
+ else:
+ try:
+ main()
+ except KeyboardInterrupt:
+ print("\nExiting...")
+ sys.exit(1)