#!/usr/bin/env python3 from __future__ import division, absolute_import, print_function, unicode_literals import os import signal import sys #import mutagen import time #from path import path import musicman import mutagen #from musicman.utils.constants import ( # VERSION, # NO_TAGS #) #from musicman.utils import ( # parse_args, # load_config #) #from musicman.utils.metadata import MetaTag def spinning_cursor(): while True: for cursor in '|/-\\': yield cursor spinner = spinning_cursor() def supports_color(): """ Returns True if the running system's terminal supports color, and False otherwise. """ plat = sys.platform supported_platform = plat != 'Pocket PC' and (plat != 'win32' or 'ANSICON' in os.environ) # isatty is not always implemented, #6223. is_a_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() if not supported_platform or not is_a_tty: return False return True def clearLine(): if supports_color(): sys.stdout.write('\033[K') sys.stdout.flush() def sanitize(text): newtext = text newtext = newtext.replace('<', '').replace('>', '').replace(':', '').replace('"', '').replace('|', '').replace('?', '').replace('*', '') newtext = newtext.replace('/', '-') newtext = newtext.strip() #if newtext != text: # clearLine() # print("text:", text) # print(" new:", newtext) return newtext def getLibrary(path, excludeDirs=[]): if os.path.isdir(path): for root, directories, filenames in sorted(os.walk(path)): if root not in excludeDirs: for filename in filenames: yield os.path.join(root, filename) # files=[] # # if os.path.isdir(path): # for libraryDir, artists, dummy in os.walk(path): # for artist in sorted(artists): # for artistDir, albums, dummy in os.walk(os.path.join(libraryDir, artist)): # if artistDir in excludeDirs: # #clearLine() # #print("Excluding:", artistDir) # #print() # continue # for album in sorted(albums): # for albumDir, dummy, songs in os.walk(os.path.join(artistDir, album)): # if albumDir in excludeDirs: # #clearLine() # #print("Excluding:", albumDir) # #print() # continue # #print("AlbumDir:", albumDir) # #print(" Artist:", artist) # #print(" Album:", album) # for song in songs: # print("Scanning", path, next(spinner), end="\r") # files.append(os.path.join(albumDir, song)) # #print("Path:", os.path.join(albumDir, song)) # clearLine() # print("Scan complete!") # return files def getEmptyDirs(path, excludeDirs=[]): if os.path.isdir(path): for root, directories, filenames in sorted(os.walk(path)): if root not in excludeDirs: if not directories and not filenames: yield root def getSong(file, orgDir=None, tmpDir=None, dstDir=None): from musicman.utils.metadata import MetaTag from musicman.utils.constants import INTERNAL_FORMATS global config global originDir global workingDir global targetDir global targetFormat song = {} orgDir = originDir if orgDir is None else orgDir tmpDir = workingDir if tmpDir is None else tmpDir dstDir = targetDir if dstDir is None else dstDir if (os.path.splitext(file)[1][1:] in INTERNAL_FORMATS): metadata = MetaTag(file) if metadata.tags.get("artist") is None or len(metadata.tags.get("artist")) < 1: return {'metadata': None} elif metadata.tags.get("albumartist") is None or len(metadata.tags.get("albumartist")) < 1: return {'metadata': None} elif metadata.tags.get('album') is None or len(metadata.tags.get("album")) < 1: return {'metadata': None} elif metadata.tags.get("musicbrainz_albumid") is None or len(metadata.tags.get("musicbrainz_albumid")) < 5: return {'metadata': None} else: song['metadata'] = metadata song['artistName'] = sanitize(metadata.tags["albumartist"]) song['albumName'] = sanitize(metadata.tags["album"]) song['titleName'] = sanitize(metadata.tags["title"]) if isinstance(metadata.tags.get("totaldiscs", 0), tuple): song['discnumber'] = int(metadata.tags.get('totaldiscs', 0)[0]) song['totaldiscs'] = int(metadata.tags.get("totaldiscs", 0)[1]) else: song['discnumber'] = int(metadata.tags.get('discnumber', 0)) song['totaldiscs'] = int(metadata.tags.get("totaldiscs", 0)) if isinstance(metadata.tags.get('totaltracks', 0), tuple): song['tracknumber'] = int(metadata.tags.get('totaltracks', 0)[0]) song['totaltracks'] = int(metadata.tags.get('totaltracks', 0)[1]) else: song['tracknumber'] = int(metadata.tags.get('tracknumber', 0)) song['totaltracks'] = int(metadata.tags.get('totaltracks', 0)) if song['totaldiscs'] > 1: song['outPath'] = os.path.join(song['artistName'], song['albumName']) song['outFile'] = '{0:d}-{1:02d}-{2}'.format(song['discnumber'], song['tracknumber'], song['titleName']) else: if metadata.tags.get("tracknumber") is not None: song['outPath'] = os.path.join(song['artistName'], song['albumName']) song['outFile'] = '{0:02d}-{1}'.format(song['tracknumber'], song['titleName']) else: song['outPath'] = os.path.join(song['artistName'], song['albumName']) outFile = '{0}'.format(song['titleName']) song['originFile'] = os.path.join(orgDir, song['outPath'], song['outFile']) song['workingFile'] = os.path.join(tmpDir, song['outPath'], song['outFile']) song['targetFile'] = os.path.join(dstDir, song['outPath'], song['outFile']) song['originExt'] = os.path.splitext(file)[1] song['targetExt'] = '.' + targetFormat return song else: clearLine() print("FATAL: File extension \"{0}\" is not supported.".format(os.path.splitext(file)[1])) print(" Supported:", ", ".join(INTERNAL_FORMATS)) sys.exit(2) def cleanLibrary(rootDir, excludeDirs=[], act=False, verbose=0): global config if excludeDirs is None: excludeDirs=[] print("Clean Dirs:", rootDir) #while getEmptyDirs(rootDir, excludeDirs) != [] and act == True and tries > 0: if act: try: while getEmptyDirs(rootDir, excludeDirs).__next__(): for path in getEmptyDirs(rootDir, excludeDirs): print("Removing:", path) if act: try: os.rmdir(path) except OSError as err: print("ERROR: Failed to remove directory:", err) sys.exit(5) except StopIteration: pass else: for path in getEmptyDirs(rootDir, excludeDirs): print("Empty:", path) #print("Processing Complete!") def renameLibrary(rootDir, excludeDirs=[], act=False, verbose=0): global config if excludeDirs is None: excludeDirs=[] print("Renaming:", rootDir) for file in getLibrary(rootDir, excludeDirs): if (os.path.isdir(os.path.dirname(file)) and os.path.isfile(file)): clearLine() print("Processing:", os.path.dirname(os.path.dirname(file)), next(spinner), end="\r") song = getSong(file, rootDir) if song['metadata'] is None: if verbose > 2: clearLine() print("Skipping: {0} due to lack of metadata".format(file)) else: if not os.path.isfile(song['originFile'] + song['originExt']): clearLine() print("Found:", file) if verbose > 0: print(" New:", song['originFile'] + song['originExt']) if (act): if verbose > 1: print("Renaming: \"{0}\" -> \"{1}\"".format(file, song['originFile'] + song['originExt'])) try: os.renames(file, song['originFile'] + song['originExt']) except OSError as err: print("ERROR: Failed to move:", err) sys.exit(5) clearLine() #print("Processing Complete!") def findUntagged(rootDir, excludeDirs=[], verbose=0): global config if excludeDirs is None: excludeDirs=[] print("Find Untagged:", rootDir) for file in getLibrary(rootDir, excludeDirs): if (os.path.isdir(os.path.dirname(file)) and os.path.isfile(file)): clearLine() print("Processing:", os.path.dirname(os.path.dirname(file)), next(spinner), end="\r") song = getSong(file) if song['metadata'] is None: clearLine() print("Untagged: {0}".format(file)) clearLine() #print("Processing Complete!") def findNew(rootDir, tmpDir, dstDir, dstFormat, excludeDirs=[], verbose=0): global config if excludeDirs is None: excludeDirs=[] print("Find New Media") for file in getLibrary(rootDir, excludeDirs): if (os.path.isdir(os.path.dirname(file)) and os.path.isfile(file)): clearLine() print("Processing:", os.path.dirname(os.path.dirname(file)), next(spinner), end="\r") song = getSong(file) if song['metadata'] is None: if verbose > 2: clearLine() print("Skipping: {0} due to lack of metadata".format(file)) else: if not (os.path.isfile(song['workingFile'] + song['targetExt']) or os.path.isfile(song['targetFile'] + song['targetExt'])): clearLine() print("New:", file) if verbose > 0: if not os.path.isfile(os.path.isfile(song['workingFile'] + song['targetExt'])): print(" No:", song['workingFile'] + song['targetExt']) elif not os.path.isfile(os.path.isfile(song['targetFile'] + song['targetExt'])): print(" No:", song['targetFile'] + song['targetExt']) #clearLine() #print("Processing Complete!") def syncWorking(tmpDir, excludeDirs=[], act=False, verbose=0): global config if excludeDirs is None: excludeDirs=[] print("Sync Target Media") for file in getLibrary(tmpDir, excludeDirs): if (os.path.isdir(os.path.dirname(file)) and os.path.isfile(file)): clearLine() print("Processing:", os.path.dirname(os.path.dirname(file)), next(spinner), end="\r") song = getSong(file) if song['metadata'] is None: if verbose > 2: clearLine() print("Skipping: {0} due to lack of metadata".format(file)) else: filename = os.path.basename(file) file_ext = os.path.splitext(filename)[1] if not os.path.isfile(song['targetDir'] + song['originExt']): print("Sync:", file) if verbose > 0: print(" To:", song['targetFile'], song['originExt']) if act: print("would've acted") #if not os.path.isfile(os.path.join(destDir, song['outPath'], song['outFile'] + file_ext)): # print("Sync:", file) # if verbose > 0: # print(" To:", os.path.join(destDir, song['outPath'], song['outFile'] + file_ext)) # # if act: # print("would've acted") #clearLine() #print("Processing Complete!") def convertMedia(rootDir, tmpDir, dstDir, dstFormat, excludeDirs=[], act=False, verbose=0): import subprocess from musicman.utils.copytags import ( copy_tags ) if excludeDirs is None: excludeDirs=[] print("Convert Library Media") for file in getLibrary(rootDir, excludeDirs): if (os.path.isdir(os.path.dirname(file)) and os.path.isfile(file)): clearLine() print("Processing:", os.path.dirname(os.path.dirname(file)), next(spinner), end="\r") song = getSong(file, rootDir, tmpDir, targetDir) if song['metadata'] is None: if verbose > 2: clearLine() print("Skipping: {0} due to lack of metadata".format(file)) else: if not (os.path.isfile(song['workingFile'] + song['targetExt']) or os.path.isfile(song['targetFile'] + song['targetExt'])): clearLine() print("New:", file) if verbose > 2: if not os.path.isfile(os.path.isfile(song['workingFile'] + song['targetExt'])): print(" No:", song['workingFile'] + song['targetExt']) elif not os.path.isfile(os.path.isfile(song['targetFile'] + song['targetExt'])): print(" No:", song['targetFile'] + song['targetExt']) if act: #print("would've acted") if not os.path.isdir(os.path.dirname(song['workingFile'])): os.makedirs(os.path.dirname(song['workingFile'])) #subprocess.call("ffmpeg", "-loglevel", "quiet", "-stats", "-i", file, "-vn", "-c:a", "libfdk_aac", "-vbr", "5", "-nostdin", "-y", song['workingFile'] + song['targetExt']) try: subprocess.call('/usr/bin/ffmpeg -loglevel quiet -stats -i "{0}" -vn -c:a libfdk_aac -vbr 5 -nostdin -y "{1}"'.format(file, song['workingFile'] + song['targetExt']), shell=True) #new = MetaData(song['workingFile'] + song['targetExt']) #new.update(song['metadata']) #new.save() #old = mutagen.File(file) #new = mutagen.File(song['workingFile'] + song['targetExt']) #new.update(old) #new.save() copy_tags(file, song['workingFile'] + song['targetExt']) except KeyboardInterrupt: if os.path.isfile(song['workingFile'] + song['targetExt']): os.remove(song['workingFile'] + song['targetExt']) clearLine() print(end='\r') print("Aborted by user") sys.exit(0) clearLine() if __name__ == '__main__': global opt global config global originPath, targetPath, targetFormat, workPath import configparser #opt, files = parse_args() opt = musicman.utils.parse_args() config = musicman.utils.load_config() print("opt:", opt) try: originDir = config['origin']['path'] if opt.originDir is None else opt.originDir except AttributeError: originDir = config['origin']['path'] try: targetDir = config['target']['path'] if opt.targetDir is None else opt.targetDir except AttributeError: targetDir = config['target']['path'] try: targetFormat = config['target']['format'] if opt.targetFormat is None else opt.targetFormat except AttributeError: targetFormat = config['target']['format'] try: workingDir = config['working']['path'] if opt.workingDir is None else opt.workingDir except AttributeError: workingDir = config['working']['path'] if opt.mode is None: print("ERROR: No command provided.") sys.exit(1) try: if opt.mode == 'clean': cleanLibrary(originDir, opt.excludeDirs, opt.act, opt.verbose) cleanLibrary(targetDir, opt.excludeDirs, opt.act, opt.verbose) cleanLibrary(workingDir, opt.excludeDirs, opt.act, opt.verbose) elif opt.mode == 'rename': renameLibrary(originDir, opt.excludeDirs, opt.act, opt.verbose) renameLibrary(targetDir, opt.excludeDirs, opt.act, opt.verbose) renameLibrary(workingDir, opt.excludeDirs, opt.act, opt.verbose) elif opt.mode == 'scan': if opt.scanMode is None: print("ERROR: Subcommand for scan not provided.") sys.exit(1) elif opt.scanMode == 'untagged': findUntagged(originDir, opt.excludeDirs, opt.verbose) findUntagged(targetDir, opt.excludeDirs, opt.verbose) findUntagged(workingDir, opt.excludeDirs, opt.verbose) elif opt.scanMode == 'new': findNew(originDir, workingDir, targetDir, targetFormat, opt.excludeDirs, opt.verbose) elif opt.mode == 'sync': syncWorking(workingDir, targetDir, opt.excludeDirs, opt.act, opt.verbose) elif opt.mode == 'convert': convertMedia(originDir, workingDir, targetDir, targetFormat, opt.excludeDirs, opt.act, opt.verbose) except KeyboardInterrupt: clearLine() print(end='\r') print("Aborted by user") sys.exit(0) clearLine() print("Processing Complete!") #for file in getLibrary(config['origin']['path']): # #print("File:", file) # if (os.path.isdir(os.path.dirname(file)) and os.path.isfile(file)): # clearLine() # print("Processing:", os.path.dirname(os.path.dirname(file)), next(spinner), end="\r") # #time.sleep(0.01) # # #print("Path:", os.path.dirname(file)) # #print("File:", os.path.basename(file)) #clearLine() #print("Processing Complete!") sys.exit(0) #config = configparser.ConfigParser() #config.read('library.ini') #print(config.sections()) #print(config["lossless"]["Path"]) #try: # #print("Test1:", config.get('lossless', 'test')) # config.get('lossless', 'path') # config.get('lossless', 'format') # config.get('converted', 'path') # config.get('converted', 'format') #except configparser.NoOptionError as err: # print("ERROR: Configuration of required settings are missing:", err) # sys.exit(1) LosslessLibraryRoot = '/srv/public/Music-Lossless' #LosslessLibraryRoot = '/srv/public/Music-iTunes' iTunesLibraryRoot = '/srv/public/Music-iTunes' #artistWalker = os.walk(LosslessLibraryRoot) #dest_dir, artists, files = artistWalker.next() #print "dest_dir: %s" % dest_dir #print "artists: %s" % artists #print "files: %s" % files #for artist in artists: # print "Artist: %s" % artist # albumWalker = os.walk(os.path.join(dest_dir, artist)) # artist_dir, albums, artist_files = albumWalker.next() # print "Albums: %s" % albums # print "Album Dir: %s" % artist_dir # for album in albums: # songWalker = os.walk(os.path.join(artist_dir, album) # album_dir, dummy, songs = songWalker.next() for libraryDir, artists, dummy in os.walk(config['origin']['path']): for artist in sorted(artists): for artistDir, albums, dummy in os.walk(os.path.join(libraryDir, artist)): for album in sorted(albums): for albumDir, dummy, songs in os.walk(os.path.join(artistDir, album)): #print("AlbumDir:", albumDir) #print(" Artist:", artist) #print(" Album:", album) for song in songs: if song.endswith('.flac'): #print " Song: %s" % song metadata = MetaTag(os.path.join(albumDir, song)) #if os.path.isfile(os.path.join() #print "MetaData: %s" % metadata.tags #os.path.join(iTunesLibraryRoot, [metadata.tags["artist"], metadata.tags["album"]]) #print("\033[KArtist:", metadata.tags["artist"], end="\r") clearLine() print("Scanning", artistDir, next(spinner), end="\r") #if int(metadata.tags["totaldiscs"]) > 1: if metadata.tags.get("artist") is None: continue if metadata.tags.get("albumartist") is None: continue if metadata.tags.get('album') is None: continue if metadata.tags.get("musicbrainz_albumid") is None or len(metadata.tags.get("musicbrainz_albumid")) < 5: clearLine() print("Skipping:", os.path.join(albumDir, song)) continue #if 'Centennial' in song: # print # print # print("DEBUG") # print("Path:", artistDir) # print("Song:", song) # print("musicbrainz_albumid:", metadata.tags.get("musicbrainz_albumid")) # print(type(metadata.tags.get("musicbrainz_albumid"))) # sys.exit(0) artistName = sanitize(metadata.tags["albumartist"]) albumName = sanitize(metadata.tags["album"]) titleName = sanitize(metadata.tags["title"]) outPath = '' outFile = '' if int(metadata.tags.get("totaldiscs", 0)) > 1: outPath = os.path.join(config['converted']['path'], artistName, albumName) outFile = '{0:d}-{1:02d}-{2}.{3}'.format(int(metadata.tags["discnumber"]), int(metadata.tags["tracknumber"]), titleName, 'm4a') #print("iTunes:", os.path.join(iTunesLibraryRoot, # metadata.tags["artist"], # metadata.tags["album"], # '{0:d}-{1:02d}-{2}.{3}'.format(int(metadata.tags["discnumber"]), # int(metadata.tags["tracknumber"]), # metadata.tags["title"], # 'm4a'))) # #int(metadata.tags["discnumber"]) + '-' + '{0:02d}'.format(int(metadata.tags["tracknumber"])) + '-' + metadata.tags["title"] + ".m4a") else: if metadata.tags.get("tracknumber") is not None: outPath = os.path.join(config['converted']['path'], artistName, albumName) outFile = '{0:02d}-{1}.{2}'.format(int(metadata.tags["tracknumber"]), titleName, 'm4a') #print("iTunes:", os.path.join(iTunesLibraryRoot, # metadata.tags["artist"], # metadata.tags["album"], # '{0:02d}'.format(int(metadata.tags["tracknumber"])) + '-' + metadata.tags["title"] + ".m4a")) else: outPath = os.path.join(config['converted']['path'], artistName, albumName) outFile = '{0}.{1}'.format(titleName, 'm4a') #print("iTunes:", os.path.join(iTunesLibraryRoot, # metadata.tags["artist"], # metadata.tags["album"], # metadata.tags["title"] + ".m4a")) #print "iTunes: %s" % os.path.join(iTunesLibraryRoot, metadata.tags["artist"], metadata.tags["album"]) if not os.path.isfile(os.path.join(outPath, outFile)): print("NEW:", os.path.join(outPath, outFile)) if song.endswith('.m4a'): #print " Song: %s" % song metadata = MetaTag(os.path.join(albumDir, song)) #print "MetaData: %s" % metadata.tags clearLine() print() #LosslessLibraryRoot = '/srv/public/Music-Lossless' # #for subdir, dirs, files in os.walk(LosslessLibraryRoot): # print subdir # # for file in files: # if file.endswith('.flac'): # audio = mutagen.File(os.path.join(subdir, file)) # print file # #print audio.tags.pprint() # print "Artist: %s" % audio['ARTIST'][0] # print "Album: %s" % audio['ALBUM'][0] # print "Title: %s" % audio['TITLE'][0] # # #for file in files: # # print os.path.join(subdir, file)