]> code.delx.au - youtube-cgi/blobdiff - youtube.cgi
Fix for Google changes
[youtube-cgi] / youtube.cgi
index b94febfdfb4f549f411477bced8349183961eb5f..c9937b72da7d3a1f352c1ab31747998cb4211748 100755 (executable)
@@ -15,8 +15,7 @@ import urllib.parse
 import urllib.request
 
 
-MAX_MEMORY_BYTES = 128 * 1024*1024
-USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"
+USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:67.0) Gecko/20100101 Firefox/67.0"
 
 MIMETYPES = {
     "video/mp4": "mp4",
@@ -36,8 +35,11 @@ QUALITIES = {
 class VideoUnavailable(Exception):
     pass
 
+class NotYouTube(Exception):
+    pass
+
 def print_form(url="", msg=""):
-    script_url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
+    script_url = "https://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
     sys.stdout.write("Content-Type: text/html\r\n\r\n")
     sys.stdout.write("""
 <!DOCTYPE html>
@@ -103,6 +105,16 @@ def urlopen(url, offset=None):
         assert start == offset
     return res
 
+def validate_url(url):
+    parsed_url = urllib.parse.urlparse(url)
+    scheme_ok = parsed_url.scheme == "https"
+    host_ok = parsed_url.netloc.lstrip("www.") in ["youtube.com", "youtu.be"]
+
+    if scheme_ok and host_ok:
+        return
+    else:
+        raise NotYouTube()
+
 def parse_url(url, parser):
     f = urlopen(url)
     parser.feed(f.read().decode("utf-8"))
@@ -138,9 +150,10 @@ def extract_js(script):
 
 def find_func_name(script):
     FUNC_NAME = R"([a-zA-Z0-9$]+)"
+    DECODE_URI_COMPONENT = R"(\(decodeURIComponent)?"
     FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))"
     TERMINATOR = R"[,;\)]"
-    PATTERN = FUNC_NAME + FUNC_PARAMS + TERMINATOR
+    PATTERN = FUNC_NAME + DECODE_URI_COMPONENT + FUNC_PARAMS + TERMINATOR
 
     match = re.search(PATTERN, script)
     func_name = match.groups()[0]
@@ -190,49 +203,36 @@ def decode_signature(js_url, signature):
         sandbox.window = sandbox;
 
         const code_string = %(code)s + ';';
-        const exec_string = 'transformed_signature = %(func_name)s("", "MARKER", signature);';
+        const exec_string = 'transformed_signature = %(func_name)s(signature);';
         vm.runInNewContext(code_string + exec_string, sandbox);
 
-        function findSignature(obj) {
-            if (typeof obj !== 'object') {
-                return;
-            }
-            for (const [key, value] of Object.entries(obj)) {
-                if (key === 'MARKER') {
-                    return value;
-                }
-                const result = findSignature(value);
-                if (result) {
-                    return result;
-                }
-            }
-        }
-        console.log(findSignature(sandbox.transformed_signature));
+        console.log(sandbox.transformed_signature);
     """ % params)
 
     p.stdin.write(js_decode_script.encode("utf-8"))
     p.stdin.close()
 
     transformed_signature = p.stdout.read().decode("utf-8").strip()
+    transformed_signature = urllib.parse.unquote(transformed_signature)
     if p.wait() != 0:
         raise Exception("js failed to execute: %d" % p.returncode)
 
     return transformed_signature
 
 def get_best_video(player_config):
-    url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",")
     js_url = player_config["assets"]["js"]
 
+    player_args = player_config["args"]
+    player_response = json.loads(player_args["player_response"])
+    formats = player_response["streamingData"]["formats"]
+
     best_url = None
     best_quality = None
     best_extension = None
-    for url_data in url_data_list:
-        url_data = urllib.parse.parse_qs(url_data)
-        mimetype = url_data["type"][0].split(";")[0]
-        quality = url_data["quality"][0]
+    for format_data in formats:
+        mimetype = format_data["mimeType"].split(";")[0]
+        quality = format_data["quality"]
 
-        if "stereo3d" in url_data:
-            continue
         if quality not in QUALITIES:
             continue
         if mimetype not in MIMETYPES:
@@ -244,16 +244,17 @@ def get_best_video(player_config):
         if best_quality is not None and quality < best_quality:
             continue
 
-        video_url = url_data["url"][0]
-        if "sig" in url_data:
-            signature = url_data["sig"][0]
-        elif "s" in url_data:
-            signature = decode_signature(js_url, url_data["s"][0])
+        if "cipher" in format_data:
+            cipher = urllib.parse.parse_qs(format_data["cipher"])
+            video_url = cipher["url"][0]
+            if "sig" in cipher:
+                signature = cipher["sig"][0]
+            elif "s" in cipher:
+                signature = decode_signature(js_url, cipher["s"][0])
+            sp = cipher.get("sp", ["signature"])[0]
+            video_url = append_to_qs(video_url, {sp: signature})
         else:
-            signature = None
-
-        if signature:
-            video_url = append_to_qs(video_url, {"signature": signature})
+            video_url = format_data["url"]
 
         best_url = video_url
         best_quality = quality
@@ -278,21 +279,24 @@ def get_video_url(page):
     if not video_url:
         return None, None
 
-    filename = sanitize_filename(page.title)
-    filename += "." + extension
+    title = player_config["args"].get("title", None)
+    if not title:
+        title = json.loads(player_config["args"]["player_response"])["videoDetails"]["title"]
+    if not title:
+        title = "Unknown title"
+
+    filename = sanitize_filename(title) + "." + extension
 
     return video_url, filename
 
 class YouTubeVideoPageParser(html.parser.HTMLParser):
     def __init__(self):
         super().__init__()
-        self.title = None
         self.unavailable_message = None
         self.scripts = []
 
     def handle_starttag(self, tag, attrs):
         attrs = dict(attrs)
-        self._handle_title(tag, attrs)
         self._handle_unavailable_message(tag, attrs)
         self._handle_script(tag, attrs)
 
@@ -302,13 +306,6 @@ class YouTubeVideoPageParser(html.parser.HTMLParser):
     def _ignore_data(self, _):
         pass
 
-    def _handle_title(self, tag, attrs):
-        if tag == "title":
-            self.handle_data = self._handle_title_data
-
-    def _handle_title_data(self, data):
-        self.title = data.strip()
-
     def _handle_unavailable_message(self, tag, attrs):
         if attrs.get("id", None) == "unavailable-message":
             self.handle_data = self._handle_unavailable_message_data
@@ -343,11 +340,12 @@ def cgimain():
     try:
         url = args["url"][0]
     except:
-        print_form(url="http://www.youtube.com/watch?v=FOOBAR")
+        print_form(url="https://www.youtube.com/watch?v=FOOBAR")
         return
 
     try:
         page = YouTubeVideoPageParser()
+        validate_url(url)
         parse_url(url, page)
         video_url, filename = get_video_url(page)
         video_data = urlopen(video_url)
@@ -356,10 +354,15 @@ def cgimain():
             url=url,
             msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.args[0])
         )
+    except NotYouTube:
+        print_form(
+            url=url,
+            msg="<p class='error'>Sorry, that does not look like a YouTube page!</p>"
+        )
     except Exception as e:
         print_form(
             url=url,
-            msg="<p class='error'>Sorry, there was an error. Check your URL?</p>"
+            msg="<p class='error'>Sorry, there was an unknown error.</p>"
         )
         return
 
@@ -411,7 +414,7 @@ def main():
     try:
         url = sys.argv[1]
     except:
-        print("Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr)
+        print("Usage: %s https://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr)
         sys.exit(1)
 
     page = YouTubeVideoPageParser()