-#!/usr/bin/python2
+#!/usr/bin/env python3
-from __future__ import division
-
-import cookielib
import cgi
+import html.parser
+import http.cookiejar
import json
-from lxml import html
import os
import re
-import resource
import shutil
import subprocess
import sys
import time
-import urllib
-import urllib2
-import urlparse
+import urllib.error
+import urllib.parse
+import urllib.request
MAX_MEMORY_BYTES = 128 * 1024*1024
def print_form(url="", msg=""):
script_url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
- sys.stdout.write("Content-Type: application/xhtml+xml\r\n\r\n")
+ sys.stdout.write("Content-Type: text/html\r\n\r\n")
sys.stdout.write("""
-<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
-<html xmlns="http://www.w3.org/1999/xhtml">
+<!DOCTYPE html>
+<html>
<head>
<title>delx.net.au - YouTube Scraper</title>
- <link rel="stylesheet" type="text/css" href="/style.css"/>
+ <link rel="stylesheet" type="text/css" href="/style.css">
<style type="text/css">
input[type="text"] {
width: 100%;
</html>
""".replace("{0}", msg).replace("{1}", url).replace("{2}", script_url))
-cookiejar = cookielib.CookieJar()
-urlopener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookiejar))
+cookiejar = http.cookiejar.CookieJar()
+urlopener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar))
referrer = ""
def urlopen(url, offset=None):
url = "https://www.youtube.com" + url
global referrer
- req = urllib2.Request(url)
+ req = urllib.request.Request(url)
if not referrer:
referrer = url
else:
res = urlopener.open(req)
- content_range = res.info().getheader("Content-Range")
+ content_range = res.getheader("Content-Range")
if content_range:
tokens = content_range.split()
assert tokens[0] == "bytes"
assert start == offset
return res
-def parse_url(url):
+def parse_url(url, parser):
f = urlopen(url)
- doc = html.parse(f, html.HTMLParser(encoding="utf-8", recover=True))
+ parser.feed(f.read().decode("utf-8"))
+ parser.close()
f.close()
- return doc
def append_to_qs(url, params):
- r = list(urlparse.urlsplit(url))
- qs = urlparse.parse_qs(r[3])
+ r = list(urllib.parse.urlsplit(url))
+ qs = urllib.parse.parse_qs(r[3])
qs.update(params)
- r[3] = urllib.urlencode(qs, True)
- url = urlparse.urlunsplit(r)
+ r[3] = urllib.parse.urlencode(qs, True)
+ url = urllib.parse.urlunsplit(r)
return url
-def get_player_config(doc):
+def get_player_config(scripts):
player_config = None
- for script in doc.xpath("//script"):
- if not script.text:
- continue
- for line in script.text.split("\n"):
+ for script in scripts:
+ for line in script.split("\n"):
s = "ytplayer.config = {"
if s in line:
p1 = line.find(s) + len(s) - 1
return func_name
def decode_signature(js_url, signature):
- script = urlopen(js_url).read()
+ f = urlopen(js_url)
+ script = f.read().decode("utf-8")
+ f.close()
+
func_name = find_func_name(script)
params = {
"code": json.dumps(extract_js(script)),
}
p = subprocess.Popen(
- "nodejs",
+ "node",
shell=True,
close_fds=True,
stdin=subprocess.PIPE,
console.log(sandbox.transformed_signature);
""" % params)
- p.stdin.write(js_decode_script)
+ p.stdin.write(js_decode_script.encode("utf-8"))
p.stdin.close()
- transformed_signature = p.stdout.read().strip()
+ transformed_signature = p.stdout.read().decode("utf-8").strip()
if p.wait() != 0:
raise Exception("js failed to execute: %d" % p.returncode)
best_quality = None
best_extension = None
for url_data in url_data_list:
- url_data = urlparse.parse_qs(url_data)
+ url_data = urllib.parse.parse_qs(url_data)
mimetype = url_data["type"][0].split(";")[0]
quality = url_data["quality"][0]
- if url_data.has_key("stereo3d"):
+ if "stereo3d" in url_data:
continue
if quality not in QUALITIES:
continue
.replace("\0", " ")
)
-def get_video_url(doc):
- unavailable = doc.xpath("//div[@id='unavailable-message']/text()")
- if unavailable:
- raise VideoUnavailable(unavailable[0].strip())
-
- player_config = get_player_config(doc)
+def get_video_url(page):
+ player_config = get_player_config(page.scripts)
if not player_config:
- raise VideoUnavailable("Could not find video URL")
+ raise VideoUnavailable(page.unavailable_message or "Could not find video URL")
video_url, extension = get_best_video(player_config)
if not video_url:
return None, None
- title = doc.xpath("/html/head/title/text()")[0]
- filename = sanitize_filename(title)
+ filename = sanitize_filename(page.title)
filename += "." + extension
return video_url, filename
+class YouTubeVideoPageParser(html.parser.HTMLParser):
+ def __init__(self):
+ super().__init__()
+ self.title = None
+ self.unavailable_message = None
+ self.scripts = []
+
+ def handle_starttag(self, tag, attrs):
+ attrs = dict(attrs)
+ self._handle_title(tag, attrs)
+ self._handle_unavailable_message(tag, attrs)
+ self._handle_script(tag, attrs)
+
+ def handle_endtag(self, tag):
+ self.handle_data = self._ignore_data
+
+ def _ignore_data(self, _):
+ pass
+
+ def _handle_title(self, tag, attrs):
+ if tag == "title":
+ self.handle_data = self._handle_title_data
+
+ def _handle_title_data(self, data):
+ self.title = data.strip()
+
+ def _handle_unavailable_message(self, tag, attrs):
+ if attrs.get("id", None) == "unavailable-message":
+ self.handle_data = self._handle_unavailable_message_data
+
+ def _handle_unavailable_message_data(self, data):
+ self.unavailable_message = data.strip()
+
+ def _handle_script(self, tag, attrs):
+ if tag == "script":
+ self.handle_data = self._handle_script_data
+
+ def _handle_script_data(self, data):
+ if data:
+ self.scripts.append(data)
+
def write_video(filename, video_data):
- httpinfo = video_data.info()
- encoded_filename = urllib.quote(filename.encode("utf-8"))
- sys.stdout.write("Content-Disposition: attachment; filename*=UTF-8''%s\r\n" % encoded_filename)
- sys.stdout.write("Content-Length: %s\r\n" % httpinfo.getheader("Content-Length"))
- sys.stdout.write("\r\n")
- shutil.copyfileobj(video_data, sys.stdout)
+ quoted_filename = urllib.parse.quote(filename.encode("utf-8"))
+ sys.stdout.buffer.write(
+ b"Content-Disposition: attachment; filename*=UTF-8''{0}\r\n"
+ .replace(b"{0}", quoted_filename.encode("utf-8"))
+ )
+ sys.stdout.buffer.write(
+ b"Content-Length: {0}\r\n"
+ .replace(b"{0}", video_data.getheader("Content-Length").encode("utf-8"))
+ )
+ sys.stdout.buffer.write(b"\r\n")
+ shutil.copyfileobj(video_data, sys.stdout.buffer)
video_data.close()
def cgimain():
return
try:
- doc = parse_url(url)
- video_url, filename = get_video_url(doc)
+ page = YouTubeVideoPageParser()
+ parse_url(url, page)
+ video_url, filename = get_video_url(page)
video_data = urlopen(video_url)
- write_video(filename, video_data)
- except VideoUnavailable, e:
+ except VideoUnavailable as e:
print_form(
url=url,
- msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.message)
+ msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.args[0])
)
- except Exception, e:
+ except Exception as e:
print_form(
url=url,
msg="<p class='error'>Sorry, there was an error. Check your URL?</p>"
)
return
+ write_video(filename, video_data)
+
def pp_size(size):
suffixes = ["", "KiB", "MiB", "GiB"]
for i, suffix in enumerate(suffixes):
# Newline at the end
print_status()
- print
+ print()
def main():
try:
url = sys.argv[1]
except:
- print >>sys.stderr, "Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0]
+ print("Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr)
sys.exit(1)
- doc = parse_url(url)
- video_url, filename = get_video_url(doc)
- print "Downloading", filename.encode("utf-8")
+ page = YouTubeVideoPageParser()
+ parse_url(url, page)
+ video_url, filename = get_video_url(page)
+ print("Downloading", filename)
- outfile = open(filename, "a")
+ outfile = open(filename, "ab")
offset = outfile.tell()
if offset > 0:
- print "Resuming download from", pp_size(offset)
+ print("Resuming download from", pp_size(offset))
total_size = None
while True:
try:
video_data = urlopen(video_url, offset)
- except urllib2.HTTPError, e:
+ except urllib.error.HTTPError as e:
if e.code == 416:
- print "File is complete!"
+ print("File is complete!")
break
else:
raise
- content_length = int(video_data.info().getheader("Content-Length"))
+ content_length = int(video_data.getheader("Content-Length"))
if total_size is None:
total_size = content_length
try:
copy_with_progress(content_length, video_data, outfile)
- except IOError, e:
- print
+ except IOError as e:
+ print()
video_data.close()
if outfile.tell() != total_size:
offset = outfile.tell()
if old_offset == offset:
time.sleep(1)
- print "Restarting download from", pp_size(offset)
+ print("Restarting download from", pp_size(offset))
else:
break
if __name__ == "__main__":
-### resource.setrlimit(resource.RLIMIT_AS, (MAX_MEMORY_BYTES, MAX_MEMORY_BYTES))
- if os.environ.has_key("SCRIPT_NAME"):
+ if "SCRIPT_NAME" in os.environ:
cgimain()
else:
try:
main()
except KeyboardInterrupt:
- print "\nExiting..."
+ print("\nExiting...")
sys.exit(1)