-#!/usr/bin/python2
+#!/usr/bin/env python3
-from __future__ import division
-
-import cookielib
import cgi
+import html.parser
+import http.cookiejar
import json
-from lxml import html
import os
import re
-import resource
import shutil
import subprocess
import sys
import time
-import urllib
-import urllib2
-import urlparse
+import urllib.error
+import urllib.parse
+import urllib.request
-MAX_MEMORY_BYTES = 128 * 1024*1024
-USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"
+USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:67.0) Gecko/20100101 Firefox/67.0"
MIMETYPES = {
"video/mp4": "mp4",
class VideoUnavailable(Exception):
pass
+class NotYouTube(Exception):
+ pass
+
def print_form(url="", msg=""):
- script_url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
- sys.stdout.write("Content-Type: application/xhtml+xml\r\n\r\n")
+ script_url = "https://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
+ sys.stdout.write("Content-Type: text/html\r\n\r\n")
sys.stdout.write("""
-<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
-<html xmlns="http://www.w3.org/1999/xhtml">
+<!DOCTYPE html>
+<html>
<head>
<title>delx.net.au - YouTube Scraper</title>
- <link rel="stylesheet" type="text/css" href="/style.css"/>
+ <link rel="stylesheet" type="text/css" href="/style.css">
<style type="text/css">
input[type="text"] {
width: 100%;
</html>
""".replace("{0}", msg).replace("{1}", url).replace("{2}", script_url))
-cookiejar = cookielib.CookieJar()
-urlopener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookiejar))
+cookiejar = http.cookiejar.CookieJar()
+urlopener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar))
referrer = ""
def urlopen(url, offset=None):
if url.startswith("//"):
- url = "http:" + url
+ url = "https:" + url
+ if not url.startswith("http://") and not url.startswith("https://"):
+ url = "https://www.youtube.com" + url
global referrer
- req = urllib2.Request(url)
+ req = urllib.request.Request(url)
if not referrer:
referrer = url
else:
res = urlopener.open(req)
- content_range = res.info().getheader("Content-Range")
+ content_range = res.getheader("Content-Range")
if content_range:
tokens = content_range.split()
assert tokens[0] == "bytes"
assert start == offset
return res
-def parse_url(url):
+def validate_url(url):
+ parsed_url = urllib.parse.urlparse(url)
+ scheme_ok = parsed_url.scheme == "https"
+ host_ok = parsed_url.netloc.lstrip("www.") in ["youtube.com", "youtu.be"]
+
+ if scheme_ok and host_ok:
+ return
+ else:
+ raise NotYouTube()
+
+def parse_url(url, parser):
f = urlopen(url)
- doc = html.parse(f, html.HTMLParser(encoding="utf-8", recover=True))
+ parser.feed(f.read().decode("utf-8"))
+ parser.close()
f.close()
- return doc
def append_to_qs(url, params):
- r = list(urlparse.urlsplit(url))
- qs = urlparse.parse_qs(r[3])
+ r = list(urllib.parse.urlsplit(url))
+ qs = urllib.parse.parse_qs(r[3])
qs.update(params)
- r[3] = urllib.urlencode(qs, True)
- url = urlparse.urlunsplit(r)
+ r[3] = urllib.parse.urlencode(qs, True)
+ url = urllib.parse.urlunsplit(r)
return url
-def get_player_config(doc):
+def get_player_config(scripts):
player_config = None
- for script in doc.xpath("//script"):
- if not script.text:
- continue
- for line in script.text.split("\n"):
+ for script in scripts:
+ for line in script.split("\n"):
s = "ytplayer.config = {"
if s in line:
p1 = line.find(s) + len(s) - 1
return script[len(PREFIX):-len(SUFFIX)]
-def find_func_name(script):
+def find_cipher_func(script):
+ FUNC_NAME = R"([a-zA-Z0-9$]+)"
+ DECODE_URI_COMPONENT = R"(\(decodeURIComponent)?"
+ FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))"
+ TERMINATOR = R"[,;\)]"
+ PATTERN = FUNC_NAME + DECODE_URI_COMPONENT + FUNC_PARAMS + TERMINATOR
+
+ match = re.search(PATTERN, script)
+ func_name = match.groups()[0]
+ return func_name
+
+def find_url_func(script):
FUNC_NAME = R"([a-zA-Z0-9$]+)"
- FUNC_PARAMS = R"(\([a-zA-Z]+\.s\))"
- PATTERN = FUNC_NAME + FUNC_PARAMS + ";"
+ PATTERN = R"this\.url\s*=\s*" + FUNC_NAME + R"\s*\(\s*this\s*\)"
match = re.search(PATTERN, script)
func_name = match.groups()[0]
return func_name
-def decode_signature(js_url, signature):
- script = urlopen(js_url).read()
- func_name = find_func_name(script)
+def decode_cipher_url(js_url, cipher):
+ cipher = urllib.parse.parse_qs(cipher)
+ args = [
+ cipher["url"][0],
+ cipher["sp"][0],
+ cipher["s"][0],
+ ]
+
+ f = urlopen(js_url)
+ script = f.read().decode("utf-8")
+ f.close()
+
+ cipher_func_name = find_cipher_func(script)
+ url_func_name = find_url_func(script)
params = {
- "func_name": func_name,
- "signature": json.dumps(signature),
+ "cipher_func_name": cipher_func_name,
+ "url_func_name": url_func_name,
+ "args": json.dumps(args),
"code": json.dumps(extract_js(script)),
}
p = subprocess.Popen(
- "nodejs",
+ "node",
shell=True,
close_fds=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
js_decode_script = ("""
- var vm = require('vm');
-
- var sandbox = {
- location: {
- hash: '',
- href: '',
- protocol: 'http:'
- },
- history: {
- pushState: function(){}
- },
- document: {},
- navigator: {},
- signature: %(signature)s,
- transformed_signature: null
+ const vm = require('vm');
+
+ const fakeGlobal = {};
+ fakeGlobal.window = fakeGlobal;
+ fakeGlobal.location = {
+ hash: '',
+ host: 'www.youtube.com',
+ hostname: 'www.youtube.com',
+ href: 'https://www.youtube.com',
+ origin: 'https://www.youtube.com',
+ pathname: '/',
+ protocol: 'https:'
+ };
+ fakeGlobal.history = {
+ pushState: function(){}
+ };
+ fakeGlobal.document = {
+ location: fakeGlobal.location
+ };
+ fakeGlobal.document = {};
+ fakeGlobal.navigator = {
+ userAgent: ''
};
- sandbox.window = sandbox;
+ fakeGlobal.XMLHttpRequest = class XMLHttpRequest {};
+ fakeGlobal.matchMedia = () => ({matches: () => {}, media: ''});
+ fakeGlobal.result_url = null;
+ fakeGlobal.g = function(){}; // this is _yt_player
- var code_string = %(code)s + ';';
- var exec_string = 'transformed_signature = %(func_name)s(signature);';
- vm.runInNewContext(code_string + exec_string, sandbox);
+ const code_string = %(code)s + ';';
+ const exec_string = 'result_url = %(url_func_name)s(%(cipher_func_name)s(...%(args)s));';
+ vm.runInNewContext(code_string + exec_string, fakeGlobal);
- console.log(sandbox.transformed_signature);
+ console.log(fakeGlobal.result_url);
""" % params)
- p.stdin.write(js_decode_script)
+ p.stdin.write(js_decode_script.encode("utf-8"))
p.stdin.close()
- transformed_signature = p.stdout.read().strip()
+ result_url = p.stdout.read().decode("utf-8").strip()
if p.wait() != 0:
raise Exception("js failed to execute: %d" % p.returncode)
- return transformed_signature
+ return result_url
def get_best_video(player_config):
- url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",")
js_url = player_config["assets"]["js"]
+ player_args = player_config["args"]
+ player_response = json.loads(player_args["player_response"])
+ formats = player_response["streamingData"]["formats"]
+
best_url = None
best_quality = None
best_extension = None
- for url_data in url_data_list:
- url_data = urlparse.parse_qs(url_data)
- mimetype = url_data["type"][0].split(";")[0]
- quality = url_data["quality"][0]
+ for format_data in formats:
+ mimetype = format_data["mimeType"].split(";")[0]
+ quality = format_data["quality"]
- if url_data.has_key("stereo3d"):
- continue
if quality not in QUALITIES:
continue
if mimetype not in MIMETYPES:
if best_quality is not None and quality < best_quality:
continue
- video_url = url_data["url"][0]
- if "sig" in url_data:
- signature = url_data["sig"][0]
- elif "s" in url_data:
- signature = decode_signature(js_url, url_data["s"][0])
+ if "signatureCipher" in format_data:
+ video_url = decode_cipher_url(js_url, format_data["signatureCipher"])
else:
- signature = None
-
- if signature:
- video_url = append_to_qs(video_url, {"signature": signature})
+ video_url = format_data["url"]
best_url = video_url
best_quality = quality
.replace("\0", " ")
)
-def get_video_url(doc):
- unavailable = doc.xpath("//div[@id='unavailable-message']/text()")
- if unavailable:
- raise VideoUnavailable(unavailable[0].strip())
-
- player_config = get_player_config(doc)
+def get_video_url(page):
+ player_config = get_player_config(page.scripts)
if not player_config:
- raise VideoUnavailable("Could not find video URL")
+ raise VideoUnavailable(page.unavailable_message or "Could not find video URL")
video_url, extension = get_best_video(player_config)
if not video_url:
return None, None
- title = doc.xpath("/html/head/title/text()")[0]
- filename = sanitize_filename(title)
- filename += "." + extension
+ title = player_config["args"].get("title", None)
+ if not title:
+ title = json.loads(player_config["args"]["player_response"])["videoDetails"]["title"]
+ if not title:
+ title = "Unknown title"
+
+ filename = sanitize_filename(title) + "." + extension
return video_url, filename
+class YouTubeVideoPageParser(html.parser.HTMLParser):
+ def __init__(self):
+ super().__init__()
+ self.unavailable_message = None
+ self.scripts = []
+
+ def handle_starttag(self, tag, attrs):
+ attrs = dict(attrs)
+ self._handle_unavailable_message(tag, attrs)
+ self._handle_script(tag, attrs)
+
+ def handle_endtag(self, tag):
+ self.handle_data = self._ignore_data
+
+ def _ignore_data(self, _):
+ pass
+
+ def _handle_unavailable_message(self, tag, attrs):
+ if attrs.get("id", None) == "unavailable-message":
+ self.handle_data = self._handle_unavailable_message_data
+
+ def _handle_unavailable_message_data(self, data):
+ self.unavailable_message = data.strip()
+
+ def _handle_script(self, tag, attrs):
+ if tag == "script":
+ self.handle_data = self._handle_script_data
+
+ def _handle_script_data(self, data):
+ if data:
+ self.scripts.append(data)
+
def write_video(filename, video_data):
- httpinfo = video_data.info()
- encoded_filename = urllib.quote(filename.encode("utf-8"))
- sys.stdout.write("Content-Disposition: attachment; filename*=UTF-8''%s\r\n" % encoded_filename)
- sys.stdout.write("Content-Length: %s\r\n" % httpinfo.getheader("Content-Length"))
- sys.stdout.write("\r\n")
- shutil.copyfileobj(video_data, sys.stdout)
+ quoted_filename = urllib.parse.quote(filename.encode("utf-8"))
+ sys.stdout.buffer.write(
+ b"Content-Disposition: attachment; filename*=UTF-8''{0}\r\n"
+ .replace(b"{0}", quoted_filename.encode("utf-8"))
+ )
+ sys.stdout.buffer.write(
+ b"Content-Length: {0}\r\n"
+ .replace(b"{0}", video_data.getheader("Content-Length").encode("utf-8"))
+ )
+ sys.stdout.buffer.write(b"\r\n")
+ shutil.copyfileobj(video_data, sys.stdout.buffer)
video_data.close()
def cgimain():
try:
url = args["url"][0]
except:
- print_form(url="http://www.youtube.com/watch?v=FOOBAR")
+ print_form(url="https://www.youtube.com/watch?v=FOOBAR")
return
try:
- doc = parse_url(url)
- video_url, filename = get_video_url(doc)
+ page = YouTubeVideoPageParser()
+ validate_url(url)
+ parse_url(url, page)
+ video_url, filename = get_video_url(page)
video_data = urlopen(video_url)
- write_video(filename, video_data)
- except VideoUnavailable, e:
+ except VideoUnavailable as e:
print_form(
url=url,
- msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.message)
+ msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.args[0])
)
- except Exception, e:
+ except NotYouTube:
print_form(
url=url,
- msg="<p class='error'>Sorry, there was an error. Check your URL?</p>"
+ msg="<p class='error'>Sorry, that does not look like a YouTube page!</p>"
+ )
+ except Exception as e:
+ print_form(
+ url=url,
+ msg="<p class='error'>Sorry, there was an unknown error.</p>"
)
return
+ write_video(filename, video_data)
+
def pp_size(size):
suffixes = ["", "KiB", "MiB", "GiB"]
for i, suffix in enumerate(suffixes):
# Newline at the end
print_status()
- print
+ print()
def main():
try:
url = sys.argv[1]
except:
- print >>sys.stderr, "Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0]
+ print("Usage: %s https://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr)
sys.exit(1)
- doc = parse_url(url)
- video_url, filename = get_video_url(doc)
- print "Downloading", filename.encode("utf-8")
+ page = YouTubeVideoPageParser()
+ parse_url(url, page)
+ video_url, filename = get_video_url(page)
+ print("Downloading", filename)
- outfile = open(filename, "a")
+ outfile = open(filename, "ab")
offset = outfile.tell()
if offset > 0:
- print "Resuming download from", pp_size(offset)
+ print("Resuming download from", pp_size(offset))
total_size = None
while True:
try:
video_data = urlopen(video_url, offset)
- except urllib2.HTTPError, e:
+ except urllib.error.HTTPError as e:
if e.code == 416:
- print "File is complete!"
+ print("File is complete!")
break
else:
raise
- content_length = int(video_data.info().getheader("Content-Length"))
+ content_length = int(video_data.getheader("Content-Length"))
if total_size is None:
total_size = content_length
try:
copy_with_progress(content_length, video_data, outfile)
- except IOError, e:
- print
+ except IOError as e:
+ print()
video_data.close()
if outfile.tell() != total_size:
offset = outfile.tell()
if old_offset == offset:
time.sleep(1)
- print "Restarting download from", pp_size(offset)
+ print("Restarting download from", pp_size(offset))
else:
break
if __name__ == "__main__":
-### resource.setrlimit(resource.RLIMIT_AS, (MAX_MEMORY_BYTES, MAX_MEMORY_BYTES))
- if os.environ.has_key("SCRIPT_NAME"):
+ if "SCRIPT_NAME" in os.environ:
cgimain()
else:
try:
main()
except KeyboardInterrupt:
- print "\nExiting..."
+ print("\nExiting...")
sys.exit(1)