+++ /dev/null
-#!/usr/bin/env python
-# Copyright 2009 James Bunton <jamesbunton@fastmail.fm>
-# Licensed for distribution under the GPL version 2, check COPYING for details
-
-import logging
-import os
-import shutil
-import sys
-import urllib
-
-from Foundation import *
-
-
-def read_plist(filename):
- try:
- data = buffer(open(filename).read())
- except IOError:
- return None
- plist, fmt, err = NSPropertyListSerialization.propertyListFromData_mutabilityOption_format_errorDescription_(data, NSPropertyListMutableContainers, None, None)
- if err is not None:
- errStr = err.encode("utf-8")
- err.release() # Doesn't follow Cocoa conventions for some reason
- raise TypeError(errStr)
- return plist
-
-class Playlist(NSObject):
- def init(self):
- return self
-
- def set(self, name, pid, tracks, parent):
- self.name = name
- self.pid = pid
- self.children = []
- self.tracks = tracks
- self.parent = parent
- if parent is not None:
- parent.children.append(self)
-
-class ITunesLibrary(NSObject):
- def load_(self, filename):
- if filename is None:
- filename = "~/Music/iTunes/iTunes Music Library.xml"
- filename = os.path.expanduser(filename)
- yield "Reading library..."
- plist = read_plist(os.path.expanduser(filename))
- self.folder = self.loc2name(plist["Music Folder"])
- pl_tracks = plist["Tracks"]
- self.playlists = {}
- for pl_playlist in plist["Playlists"]:
- playlist = self.make_playlist(pl_playlist, pl_tracks)
- yield "Read playlist: " + playlist.name
- self.playlists[playlist.pid] = playlist
-
- def loc2name(self, location):
- return urllib.splithost(urllib.splittype(urllib.unquote(location))[1])[1]
-
- def make_playlist(self, pl_playlist, pl_tracks):
- name = pl_playlist["Name"]
- pid = pl_playlist["Playlist Persistent ID"]
- parent = None
- try:
- parent_pid = pl_playlist["Parent Persistent ID"]
- parent = self.playlists.get(parent_pid)
- except KeyError:
- pass
- tracks = []
- for item in pl_playlist.get("Playlist Items", []):
- trackID = item["Track ID"]
- filename = str(pl_tracks[str(trackID)]["Location"])
- filename = self.loc2name(filename)
- filename = filename.decode("utf-8")
- if not filename.startswith(self.folder):
- logging.warn("Skipping: " + filename)
- continue
- filename = strip_prefix(filename, self.folder)
- tracks.append(filename)
- playlist = Playlist.alloc().init()
- playlist.set(name, pid, tracks, parent)
- return playlist
-
- def has_playlist_name(self, name):
- for p in self.get_playlists():
- if p.name == name:
- return True
- return False
-
- def get_playlist_name(self, name):
- for playlist in self.get_playlists():
- if playlist.name == name:
- return playlist
-
- def get_playlist_pid(self, pid):
- for playlist in self.get_playlists():
- if playlist.pid == pid:
- return playlist
-
- def get_playlists(self):
- return self.playlists.values()
-
- def outlineView_numberOfChildrenOfItem_(self, view, item):
- if item == None:
- return len(self.playlists)
- else:
- return 0
-
- def outlineView_isItemExpandable_(self, view, item):
- return False
-
- def outlineView_child_ofItem_(self, view, index, item):
- if item == None:
- return self.playlists[index]
- else:
- return None
-
- def outlineView_objectValueForTableColumn_byItem_(self, view, column, item):
- return item.name
-
-
-encoded_names = {}
-valid_chars = frozenset("\\/-_.() abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
-def encode_filename(filename):
- try:
- return encoded_names[filename]
- except KeyError:
- pass
- orig_filename = filename
- filename = filename.encode("ascii", "ignore")
- filename = "".join(c for c in filename if c in valid_chars)
- if filename in encoded_names:
- a, b = os.path.splitext(filename)
- a += "-dup"
- filename = a + b
- encoded_names[orig_filename] = filename
- return filename
-
-def strip_prefix(s, prefix):
- assert s.startswith(prefix)
- s = s[len(prefix):]
- if s.startswith("/"):
- s = s[1:]
- return s
-
-def mkdirhier(path):
- if os.path.isdir(path):
- return
- paths = [path]
- while path != "/":
- path = os.path.split(path)[0]
- paths.append(path)
- for path in reversed(paths):
- try:
- os.mkdir(path)
- except OSError:
- pass
-
-def export_m3u(dry_run, dest, path_prefix, playlist_name, files):
- if dry_run:
- return
- if not path_prefix:
- path_prefix = "../"
- playlist_file = os.path.join(dest, "-Playlists-", playlist_name) + ".m3u"
- mkdirhier(os.path.dirname(playlist_file))
- logging.info("Writing: " + playlist_file)
- f = open(playlist_file, "w")
- for filename in files:
- if path_prefix.find("\\") > 0:
- filename = filename.replace("/", "\\")
- filename = encode_filename(filename)
- f.write("%s%s\n" % (path_prefix, filename))
- f.close()
-
-def sync(dry_run, source, dest, files_to_copy):
- join = os.path.join
-
- logging.info("Calculating files to sync and deleting old files")
- source = source.encode("utf-8")
- dest = dest.encode("utf-8")
- filemap = {}
- class SyncFile(object): pass
- for f in files_to_copy:
- sf = SyncFile()
- sf.orig_filename = f.encode("utf-8")
- sf.encoded_filename = encode_filename(f)
- filemap[sf.encoded_filename.lower()] = sf
- files_to_copy = set(filemap)
-
- for dirpath, dirnames, filenames in os.walk(dest):
- full_dirpath = dirpath
- dirpath = strip_prefix(dirpath, dest)
-
- for filename in filenames:
- filename = join(dirpath, filename)
-
- # Whenever 'file' is deleted OSX will helpfully remove '._file'
- if not os.path.exists(join(dest, filename)):
- continue
-
- if filename.lower() in files_to_copy:
- source_filename = filemap[filename.lower()].orig_filename
- sourcestat = os.stat(join(source, source_filename))
- deststat = os.stat(join(dest, filename))
- same_time = abs(sourcestat.st_mtime - deststat.st_mtime) < 5
- same_size = sourcestat.st_size == deststat.st_size
- if same_time and same_size:
- files_to_copy.remove(filename.lower())
- yield "Keep: " + filename
- else:
- yield "Update: " + filename
-
- elif not filename.startswith("-Playlists-"):
- yield "Delete: " + filename
- if not dry_run:
- os.unlink(join(dest, filename))
-
- if len(os.listdir(full_dirpath)) == 0:
- yield "Delete: " + dirpath
- if not dry_run:
- os.rmdir(full_dirpath)
-
-
- logging.info("Copying new files")
- files_to_copy = list(files_to_copy)
- files_to_copy.sort()
- for filename in files_to_copy:
- yield "Copy: " + filemap[filename].orig_filename
- if not dry_run:
- mkdirhier(os.path.dirname(join(dest, filename)))
- shutil.copy2(
- join(source, filemap[filename].orig_filename),
- join(dest, filemap[filename].encoded_filename)
- )
-
-