import urllib.request
-MAX_MEMORY_BYTES = 128 * 1024*1024
-USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"
+MOZILLA_RELEASE_URL = "https://www.mozilla.org/en-US/firefox/releases/"
+USER_AGENT_TEMPLATE = "Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/%s"
MIMETYPES = {
"video/mp4": "mp4",
cookiejar = http.cookiejar.CookieJar()
urlopener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar))
referrer = ""
+user_agent = None
def urlopen(url, offset=None):
+ global user_agent
+ if not user_agent:
+ page = MozillaReleasesPageParser()
+ with urllib.request.urlopen(MOZILLA_RELEASE_URL) as f:
+ page.feed(f.read().decode("utf-8"))
+ page.close()
+ user_agent = USER_AGENT_TEMPLATE % page.latest_release
+
if url.startswith("//"):
url = "https:" + url
if not url.startswith("http://") and not url.startswith("https://"):
else:
req.add_header("Referer", referrer)
- req.add_header("User-Agent", USER_AGENT)
+ req.add_header("User-Agent", user_agent)
if offset:
req.add_header("Range", "bytes=%d-" % offset)
def validate_url(url):
parsed_url = urllib.parse.urlparse(url)
scheme_ok = parsed_url.scheme == "https"
- host_ok = parsed_url.netloc.lstrip("www.") in ["youtube.com", "youtu.be"]
+ host = parsed_url.netloc.lstrip("www.").lstrip("m.")
+ host_ok = host in ["youtube.com", "youtu.be"]
if scheme_ok and host_ok:
return
else:
raise NotYouTube()
-def parse_url(url, parser):
+def load_parse_url(url, parser):
f = urlopen(url)
parser.feed(f.read().decode("utf-8"))
parser.close()
return url
def get_player_config(scripts):
- player_config = None
+ config_strings = [
+ ("ytcfg.set({\"", 2, "});", 1),
+ ("ytInitialPlayerResponse = {\"", 2, "};", 1),
+ ]
+ player_config = {}
for script in scripts:
for line in script.split("\n"):
- s = "ytplayer.config = {"
- if s in line:
- p1 = line.find(s) + len(s) - 1
- p2 = line.find("};", p1) + 1
- if p1 >= 0 and p2 > 0:
- return json.loads(line[p1:p2])
+ for s1, off1, s2, off2 in config_strings:
+ if s1 in line:
+ p1 = line.find(s1) + len(s1) - off1
+ p2 = line.find(s2, p1) + off2
+ if p1 >= 0 and p2 > 0:
+ player_config.update(json.loads(line[p1:p2]))
+ return player_config
def extract_js(script):
PREFIX = "var _yt_player={};(function(g){var window=this;"
return script[len(PREFIX):-len(SUFFIX)]
-def find_func_name(script):
+def find_cipher_func(script):
FUNC_NAME = R"([a-zA-Z0-9$]+)"
+ DECODE_URI_COMPONENT = R"(\(decodeURIComponent)?"
FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))"
TERMINATOR = R"[,;\)]"
- PATTERN = FUNC_NAME + FUNC_PARAMS + TERMINATOR
+ PATTERN = FUNC_NAME + DECODE_URI_COMPONENT + FUNC_PARAMS + TERMINATOR
match = re.search(PATTERN, script)
func_name = match.groups()[0]
return func_name
-def decode_signature(js_url, signature):
+def construct_url_from_cipher_result(cipher_result):
+ for k, v in cipher_result.items():
+ if isinstance(v, str) and v.startswith("https://"):
+ temp_url = v
+ break
+ else:
+ raise Exception("Could not find URL-like string in cipher result!")
+
+ for k, v in cipher_result.items():
+ if isinstance(v, dict):
+ params = {}
+ for k2, v2 in v.items():
+ params[k2] = urllib.parse.unquote(v2)
+ return append_to_qs(temp_url, params)
+ else:
+ raise Exception("Could not find params-like structure in cipher result!")
+
+def decode_cipher_url(js_url, cipher):
+ cipher = urllib.parse.parse_qs(cipher)
+ args = [
+ cipher["url"][0],
+ cipher["sp"][0],
+ cipher["s"][0],
+ ]
+
f = urlopen(js_url)
script = f.read().decode("utf-8")
f.close()
- func_name = find_func_name(script)
+ cipher_func_name = find_cipher_func(script)
params = {
- "func_name": func_name,
- "signature": json.dumps(signature),
+ "cipher_func_name": cipher_func_name,
+ "args": json.dumps(args),
"code": json.dumps(extract_js(script)),
}
p = subprocess.Popen(
js_decode_script = ("""
const vm = require('vm');
- const sandbox = {
- location: {
- hash: '',
- href: '',
- protocol: 'http:'
- },
- history: {
- pushState: function(){}
- },
- document: {},
- navigator: {
- userAgent: ''
- },
- XMLHttpRequest: class XMLHttpRequest {},
- matchMedia: () => ({matches: () => {}, media: ''}),
- signature: %(signature)s,
- transformed_signature: null,
- g: function(){} // this is _yt_player
+ const fakeGlobal = {};
+ fakeGlobal.window = fakeGlobal;
+ fakeGlobal.location = {
+ hash: '',
+ host: 'www.youtube.com',
+ hostname: 'www.youtube.com',
+ href: 'https://www.youtube.com',
+ origin: 'https://www.youtube.com',
+ pathname: '/',
+ protocol: 'https:'
};
- sandbox.window = sandbox;
+ fakeGlobal.history = {
+ pushState: function(){}
+ };
+ fakeGlobal.document = {
+ location: fakeGlobal.location
+ };
+ fakeGlobal.document = {};
+ fakeGlobal.navigator = {
+ userAgent: ''
+ };
+ fakeGlobal.XMLHttpRequest = class XMLHttpRequest {};
+ fakeGlobal.matchMedia = () => ({matches: () => {}, media: ''});
+ fakeGlobal.result = null;
+ fakeGlobal.g = function(){}; // this is _yt_player
+ fakeGlobal.TimeRanges = function(){};
const code_string = %(code)s + ';';
- const exec_string = 'transformed_signature = %(func_name)s("", "MARKER", signature);';
- vm.runInNewContext(code_string + exec_string, sandbox);
-
- function findSignature(obj) {
- if (typeof obj !== 'object') {
- return;
- }
- for (const [key, value] of Object.entries(obj)) {
- if (key === 'MARKER') {
- return value;
- }
- const result = findSignature(value);
- if (result) {
- return result;
- }
- }
- }
- console.log(findSignature(sandbox.transformed_signature));
+ const exec_string = 'result = %(cipher_func_name)s(...%(args)s);';
+ vm.runInNewContext(code_string + exec_string, fakeGlobal);
+
+ console.log(JSON.stringify(fakeGlobal.result));
""" % params)
p.stdin.write(js_decode_script.encode("utf-8"))
p.stdin.close()
- transformed_signature = p.stdout.read().decode("utf-8").strip()
+ result = json.load(p.stdout)
if p.wait() != 0:
raise Exception("js failed to execute: %d" % p.returncode)
- return transformed_signature
+ result_url = construct_url_from_cipher_result(result)
+ return result_url
def get_best_video(player_config):
- url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",")
- js_url = player_config["assets"]["js"]
+ formats = player_config["streamingData"]["formats"]
best_url = None
best_quality = None
best_extension = None
- for url_data in url_data_list:
- url_data = urllib.parse.parse_qs(url_data)
- mimetype = url_data["type"][0].split(";")[0]
- quality = url_data["quality"][0]
+ for format_data in formats:
+ mimetype = format_data["mimeType"].split(";")[0]
+ quality = format_data["quality"]
- if "stereo3d" in url_data:
- continue
if quality not in QUALITIES:
continue
if mimetype not in MIMETYPES:
if best_quality is not None and quality < best_quality:
continue
- video_url = url_data["url"][0]
- if "sig" in url_data:
- signature = url_data["sig"][0]
- elif "s" in url_data:
- signature = decode_signature(js_url, url_data["s"][0])
+ if "signatureCipher" in format_data:
+ js_url = player_config["PLAYER_JS_URL"]
+ video_url = decode_cipher_url(js_url, format_data["signatureCipher"])
else:
- signature = None
-
- if signature:
- video_url = append_to_qs(video_url, {"signature": signature})
+ video_url = format_data["url"]
best_url = video_url
best_quality = quality
if not video_url:
return None, None
- filename = sanitize_filename(page.title)
- filename += "." + extension
+ title = player_config["videoDetails"].get("title", None)
+ if not title:
+ title = "Unknown title"
+
+ filename = sanitize_filename(title) + "." + extension
return video_url, filename
class YouTubeVideoPageParser(html.parser.HTMLParser):
def __init__(self):
super().__init__()
- self.title = None
self.unavailable_message = None
self.scripts = []
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)
- self._handle_title(tag, attrs)
self._handle_unavailable_message(tag, attrs)
self._handle_script(tag, attrs)
def _ignore_data(self, _):
pass
- def _handle_title(self, tag, attrs):
- if tag == "title":
- self.handle_data = self._handle_title_data
-
- def _handle_title_data(self, data):
- self.title = data.strip()
-
def _handle_unavailable_message(self, tag, attrs):
if attrs.get("id", None) == "unavailable-message":
self.handle_data = self._handle_unavailable_message_data
if data:
self.scripts.append(data)
+class MozillaReleasesPageParser(html.parser.HTMLParser):
+ def __init__(self):
+ super().__init__()
+ self.latest_release = "1.0"
+
+ def handle_starttag(self, tag, attrs):
+ attrs = dict(attrs)
+ if attrs.get("data-latest-firefox", None):
+ self.latest_release = attrs.get("data-latest-firefox", None)
+
def write_video(filename, video_data):
quoted_filename = urllib.parse.quote(filename.encode("utf-8"))
sys.stdout.buffer.write(
try:
page = YouTubeVideoPageParser()
validate_url(url)
- parse_url(url, page)
+ with urlopen(url) as f:
+ page.feed(f.read().decode("utf-8"))
+ page.close()
video_url, filename = get_video_url(page)
video_data = urlopen(video_url)
except VideoUnavailable as e:
sys.exit(1)
page = YouTubeVideoPageParser()
- parse_url(url, page)
+ with urlopen(url) as f:
+ page.feed(f.read().decode("utf-8"))
+ page.close()
video_url, filename = get_video_url(page)
print("Downloading", filename)