]> code.delx.au - notipod/blobdiff - libnotipod.py
Moved stuff around
[notipod] / libnotipod.py
diff --git a/libnotipod.py b/libnotipod.py
new file mode 100644 (file)
index 0000000..80f6a0d
--- /dev/null
@@ -0,0 +1,233 @@
+#!/usr/bin/env python
+# Copyright 2009 James Bunton <jamesbunton@fastmail.fm>
+# Licensed for distribution under the GPL version 2, check COPYING for details
+
+import logging
+import os
+import shutil
+import sys
+import urllib
+
+from Foundation import *
+
+
+def read_plist(filename):
+       try:
+               data = buffer(open(filename).read())
+       except IOError:
+               return None
+       plist, fmt, err = NSPropertyListSerialization.propertyListFromData_mutabilityOption_format_errorDescription_(data, NSPropertyListMutableContainers, None, None)
+       if err is not None:
+               errStr = err.encode("utf-8")
+               err.release() # Doesn't follow Cocoa conventions for some reason
+               raise TypeError(errStr)
+       return plist
+
+class Playlist(NSObject):
+       def init(self):
+               return self
+
+       def set(self, name, pid, tracks, parent):
+               self.name = name
+               self.pid = pid
+               self.children = []
+               self.tracks = tracks
+               self.parent = parent
+               if parent is not None:
+                       parent.children.append(self)
+
+class ITunesLibrary(NSObject):
+       def load_(self, filename):
+               if filename is None:
+                       filename = "~/Music/iTunes/iTunes Music Library.xml"
+               filename = os.path.expanduser(filename)
+               yield "Reading library..."
+               plist = read_plist(os.path.expanduser(filename))
+               self.folder = self.loc2name(plist["Music Folder"])
+               pl_tracks = plist["Tracks"]
+               self.playlists = {}
+               for pl_playlist in plist["Playlists"]:
+                       playlist = self.make_playlist(pl_playlist, pl_tracks)
+                       yield "Read playlist: " + playlist.name
+                       self.playlists[playlist.pid] = playlist
+
+       def loc2name(self, location):
+               return urllib.splithost(urllib.splittype(urllib.unquote(location))[1])[1]
+       
+       def make_playlist(self, pl_playlist, pl_tracks):
+               name = pl_playlist["Name"]
+               pid = pl_playlist["Playlist Persistent ID"]
+               parent = None
+               try:
+                       parent_pid = pl_playlist["Parent Persistent ID"]
+                       parent = self.playlists.get(parent_pid)
+               except KeyError:
+                       pass
+               tracks = []
+               for item in pl_playlist.get("Playlist Items", []):
+                       trackID = item["Track ID"]
+                       filename = str(pl_tracks[str(trackID)]["Location"])
+                       filename = self.loc2name(filename)
+                       filename = filename.decode("utf-8")
+                       if not filename.startswith(self.folder):
+                               logging.warn("Skipping: " + filename)
+                               continue
+                       filename = strip_prefix(filename, self.folder)
+                       tracks.append(filename)
+               playlist = Playlist.alloc().init()
+               playlist.set(name, pid, tracks, parent)
+               return playlist
+
+       def has_playlist_name(self, name):
+               for p in self.get_playlists():
+                       if p.name == name:
+                               return True
+               return False
+
+       def get_playlist_name(self, name):
+               for playlist in self.get_playlists():
+                       if playlist.name == name:
+                               return playlist
+       
+       def get_playlist_pid(self, pid):
+               for playlist in self.get_playlists():
+                       if playlist.pid == pid:
+                               return playlist
+
+       def get_playlists(self):
+               return self.playlists.values()
+
+       def outlineView_numberOfChildrenOfItem_(self, view, item):
+               if item == None:
+                       return len(self.playlists)
+               else:
+                       return 0
+
+       def outlineView_isItemExpandable_(self, view, item):
+               return False
+
+       def outlineView_child_ofItem_(self, view, index, item):
+               if item == None:
+                       return self.playlists[index]
+               else:
+                       return None
+
+       def outlineView_objectValueForTableColumn_byItem_(self, view, column, item):
+               return item.name
+
+
+encoded_names = {}
+valid_chars = frozenset("\\/-_.() abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
+def encode_filename(filename):
+       try:
+               return encoded_names[filename]
+       except KeyError:
+               pass
+       orig_filename = filename
+       filename = filename.encode("ascii", "ignore")
+       filename = "".join(c for c in filename if c in valid_chars)
+       if filename in encoded_names:
+               a, b = os.path.splitext(filename)
+               a += "-dup"
+               filename = a + b
+       encoded_names[orig_filename] = filename
+       return filename
+
+def strip_prefix(s, prefix):
+       assert s.startswith(prefix)
+       s = s[len(prefix):]
+       if s.startswith("/"):
+               s = s[1:]
+       return s
+
+def mkdirhier(path):
+       if os.path.isdir(path):
+               return
+       paths = [path]
+       while path != "/":
+               path = os.path.split(path)[0]
+               paths.append(path)
+       for path in reversed(paths):
+               try:
+                       os.mkdir(path)
+               except OSError:
+                       pass
+
+def export_m3u(dry_run, dest, path_prefix, playlist_name, files):
+       if dry_run:
+               return
+       if not path_prefix:
+               path_prefix = "../"
+       playlist_file = os.path.join(dest, "-Playlists-", playlist_name) + ".m3u"
+       mkdirhier(os.path.dirname(playlist_file))
+       logging.info("Writing: " + playlist_file)
+       f = open(playlist_file, "w")
+       for filename in files:
+               if path_prefix.find("\\") > 0:
+                       filename = filename.replace("/", "\\")
+               filename = encode_filename(filename)
+               f.write("%s%s\n" % (path_prefix, filename))
+       f.close()
+
+def sync(dry_run, source, dest, files_to_copy):
+       join = os.path.join
+
+       logging.info("Calculating files to sync and deleting old files")
+       source = source.encode("utf-8")
+       dest = dest.encode("utf-8")
+       filemap = {}
+       class SyncFile(object): pass
+       for f in files_to_copy:
+               sf = SyncFile()
+               sf.orig_filename = f.encode("utf-8")
+               sf.encoded_filename = encode_filename(f)
+               filemap[sf.encoded_filename.lower()] = sf
+       files_to_copy = set(filemap)
+
+       for dirpath, dirnames, filenames in os.walk(dest):
+               full_dirpath = dirpath
+               dirpath = strip_prefix(dirpath, dest)
+
+               for filename in filenames:
+                       filename = join(dirpath, filename)
+
+                       # Whenever 'file' is deleted OSX will helpfully remove '._file'
+                       if not os.path.exists(join(dest, filename)):
+                               continue
+
+                       if filename.lower() in files_to_copy:
+                               source_filename = filemap[filename.lower()].orig_filename
+                               sourcestat = os.stat(join(source, source_filename))
+                               deststat = os.stat(join(dest, filename))
+                               same_time = abs(sourcestat.st_mtime - deststat.st_mtime) < 5
+                               same_size = sourcestat.st_size == deststat.st_size
+                               if same_time and same_size:
+                                       files_to_copy.remove(filename.lower())
+                                       yield "Keep: " + filename
+                               else:
+                                       yield "Update: " + filename
+
+                       elif not filename.startswith("-Playlists-"):
+                               yield "Delete: " + filename
+                               if not dry_run:
+                                       os.unlink(join(dest, filename))
+
+               if len(os.listdir(full_dirpath)) == 0:
+                       yield "Delete: " + dirpath
+                       if not dry_run:
+                               os.rmdir(full_dirpath)
+
+
+       logging.info("Copying new files")
+       files_to_copy = list(files_to_copy)
+       files_to_copy.sort()
+       for filename in files_to_copy:
+               yield "Copy: " + filemap[filename].orig_filename
+               if not dry_run:
+                       mkdirhier(os.path.dirname(join(dest, filename)))
+                       shutil.copy2(
+                               join(source, filemap[filename].orig_filename),
+                               join(dest, filemap[filename].encoded_filename)
+                       )
+
+