]> code.delx.au - notipod/blobdiff - libsyncitunes.py
Renamed stuff
[notipod] / libsyncitunes.py
diff --git a/libsyncitunes.py b/libsyncitunes.py
new file mode 100644 (file)
index 0000000..ea37541
--- /dev/null
@@ -0,0 +1,170 @@
+#!/usr/bin/env python
+# Copyright 2009 James Bunton <jamesbunton@fastmail.fm>
+# Licensed for distribution under the GPL version 2, check COPYING for details
+
+import logging
+import os
+import shutil
+import urllib
+
+from Foundation import *
+
+
+def read_plist(filename):
+       try:
+               data = buffer(open(filename).read())
+       except IOError:
+               return None
+       plist, fmt, err = NSPropertyListSerialization.propertyListFromData_mutabilityOption_format_errorDescription_(data, NSPropertyListMutableContainers, None, None)
+       if err is not None:
+               errStr = err.encode("utf-8")
+               err.release() # Doesn't follow Cocoa conventions for some reason
+               raise TypeError(errStr)
+       return plist
+
+class Playlist(object):
+       def __init__(self, name, tracks, parent=None):
+               self.name = name
+               self.children = []
+               self.tracks = tracks
+               if parent is not None:
+                       parent.children.append(self)
+
+class Library(NSObject):
+       def init(self):
+               return self.initWithFilename_("~/Music/iTunes/iTunes Music Library.xml")
+
+       def initWithFilename_(self, filename):
+               filename = os.path.expanduser(filename)
+               plist = read_plist(os.path.expanduser(filename))
+               self.folder = self.loc2name(plist["Music Folder"])
+               pl_tracks = plist["Tracks"]
+               self.playlists = []
+               for pl_playlist in plist["Playlists"]:
+                       self.playlists.append(self.make_playlist(pl_playlist, pl_tracks))
+               return self
+
+       def loc2name(self, location):
+               return urllib.splithost(urllib.splittype(urllib.unquote(location))[1])[1]
+       
+       def make_playlist(self, pl_playlist, pl_tracks):
+               name = pl_playlist["Name"]
+               tracks = []
+               for item in pl_playlist.get("Playlist Items", []):
+                       trackID = item["Track ID"]
+                       filename = str(pl_tracks[str(trackID)]["Location"])
+                       filename = self.loc2name(filename)
+                       filename = filename[len(self.folder):]
+                       filename = eval(repr(filename).lstrip("u")).decode("utf-8")
+                       tracks.append(filename)
+               return Playlist(name, tracks)
+
+       def has_playlist(self, playlist):
+               for p in self.playlists:
+                       if p.name == playlist:
+                               return True
+               return False
+
+       def get_playlist(self, name):
+               playlist = [p for p in self.playlists if p.name == name][0]
+               return playlist.tracks
+
+       def list_playlists(self):
+               return [p.name for p in self.playlists]
+
+       def outlineView_numberOfChildrenOfItem_(self, view, item):
+               if item == None:
+                       return len(self.playlists)
+               else:
+                       return 0
+
+       def outlineView_isItemExpandable_(self, view, item):
+               return False
+
+       def outlineView_child_ofItem_(self, view, index, item):
+               if item == None:
+                       return self.playlists[index]
+               else:
+                       return None
+
+       def outlineView_objectValueForTableColumn_byItem_(self, view, column, item):
+               return item.name
+
+
+def export_m3u(dry_run, dest, drive_letter, music_dir, playlist_name, files):
+       if dry_run:
+               return
+       f = open(os.path.join(dest, playlist_name) + ".m3u", "w")
+       for filename in files:
+               filename = filename.replace("/", "\\").encode("utf-8")
+               f.write("%s:\\%s\\%s\n" % (drive_letter, music_dir, filename))
+       f.close()
+
+def strip_prefix(s, prefix):
+       assert s.startswith(prefix)
+       s = s[len(prefix):]
+       if s.startswith("/"):
+               s = s[1:]
+       return s
+
+def mkdirhier(path):
+       if os.path.isdir(path):
+               return
+       paths = [path]
+       while path != "/":
+               path = os.path.split(path)[0]
+               paths.append(path)
+       for path in reversed(paths):
+               try:
+                       os.mkdir(path)
+               except OSError:
+                       pass
+
+def sync(dry_run, source, dest, files):
+       join = os.path.join
+
+       logging.info("Calculating files to sync and deleting old files")
+       files = set(files)
+       for dirpath, dirnames, filenames in os.walk(dest):
+               full_dirpath = dirpath
+               dirpath = strip_prefix(dirpath, dest)
+
+               for filename in filenames:
+                       filename = join(dirpath, filename).decode("utf-8")
+
+                       # Whenever 'file' is deleted OSX will helpfully remove '._file'
+                       if not os.path.exists(join(dest, filename)):
+                               continue
+
+                       if filename in files:
+                               sourcestat = os.stat(join(source, filename))
+                               deststat = os.stat(join(dest, filename))
+                               same_time = abs(sourcestat.st_mtime - deststat.st_mtime) < 5
+                               same_size = sourcestat.st_size == deststat.st_size
+                               if same_time and same_size:
+                                       files.remove(filename)
+                                       logging.debug("keep: " + filename)
+                               else:
+                                       logging.debug("update: " + filename)
+
+                       elif not filename.startswith("Playlists/"):
+                               logging.debug("delete: " + filename)
+                               if not dry_run:
+                                       os.unlink(join(dest, filename))
+
+               if len(os.listdir(full_dirpath)) == 0:
+                       logging.debug("rmdir: " + dirpath)
+                       if not dry_run:
+                               os.rmdir(full_dirpath)
+
+
+       logging.info("Copying new files")
+       files = list(files)
+       files.sort()
+       for filename in files:
+               logging.debug("copy: " + filename)
+               if not dry_run:
+                       mkdirhier(os.path.dirname(join(dest, filename)))
+                       shutil.copy2(join(source, filename), join(dest, filename))
+
+