Add files via upload

This commit is contained in:
LukeSpad 2021-12-17 18:43:47 +00:00 committed by GitHub
parent 09cb4b23d0
commit d58e5a0c77
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

155
main.py
View file

@ -17,23 +17,63 @@ if __name__ == '__main__':
# Define callback function for JSON data dump from B&O Gateway # Define callback function for JSON data dump from B&O Gateway
def cb(name, header, payload, message): def cb(name, header, payload, message):
# Report message content to log # Sanitise messages
message_log(name, header, payload, message) # Check for standby conditions
try:
if message['State_Update']['state'] == '':
message['State_Update']['state'] = 'Standby'
except KeyError:
pass
try:
if message['State_Update']['source'] == '':
message['State_Update']['state'] = 'Standby'
except KeyError:
pass
# Sanitise unknown Channel/Tracks
try:
if message['State_Update']['nowPlayingDetails']['channel_track'] in [0, 255, '0', '255']:
del(message['State_Update']['nowPlayingDetails']['channel_track'])
except KeyError:
pass
try:
if message['State_Update']['source'] == "A.TAPE2/N.MUSIC":
del(message['State_Update']['nowPlayingDetails']['channel_track'])
except KeyError:
pass
try: # Transport controls for N.MUSIC iTunes control
if message['State_Update']['source'] == "A.TAPE2/N.MUSIC":
CONST.gateway['Current Audio Source'] = "A.TAPE2/N.MUSIC"
try:
if message['Device'] not in CONST.gateway['N.MUSIC Renderers']:
CONST.gateway['N.MUSIC Renderers'].append(message['Device'])
finally:
iTunes_transport_control(message)
elif message['State_Update']['source'] != "A.TAPE2/N.MUSIC":
if message['State_Update']['source'] in CONST.source_type_dict.get("Audio Sources"):
CONST.gateway['Now Playing'] = 'Unknown'
CONST.gateway['N.MUSIC Renderers'] = []
except KeyError:
pass
# If any AV renderers are playing N.Music, provide an update of current track
if CONST.gateway['N.MUSIC Renderers'] and CONST.gateway['Current Audio Source'] == "A.TAPE2/N.MUSIC":
_get_track_info(message)
# If State Update received then post update in Notification Center # If State Update received then post update in Notification Center
notify_and_update(message) notify_and_update(message)
try: # Transport controls for N.MUSIC iTunes control # when count of N.Music renderers is zero, stop iTunes playback
if message['State_Update']['source'] == "A.TAPE2/N.MUSIC": if not CONST.gateway['N.MUSIC Renderers']:
iTunes_transport_control(message) iTunes.stop()
elif message['State_Update']['source'] != "A.TAPE2/N.MUSIC":
iTunes.stop() # Report message content to log
except KeyError: message_log(name, header, payload, message)
pass
def notify_and_update(message): def notify_and_update(message):
# Post Source change information to Notification Center # Post Source change information to Notification Center
try: if 'State_Update' in message and 'source' in message['State_Update']:
current_source = str(message['State_Update']['source']) current_source = str(message['State_Update']['source'])
# Check to see if this is a new Music Source notification # Check to see if this is a new Music Source notification
@ -41,20 +81,22 @@ if __name__ == '__main__':
update_devices(message, current_source, 'Audio Source') update_devices(message, current_source, 'Audio Source')
if current_source not in ['', CONST.gateway['Current Audio Source']]: if current_source not in ['', CONST.gateway['Current Audio Source']]:
CONST.gateway['Current Audio Source'] = current_source CONST.gateway['Current Audio Source'] = current_source
iTunes.notify("Current Audio Source: " + str(message['State_Update']['sourceName'])) iTunes.notify("Current Audio Source: " + str(message['State_Update']['sourceName']),
"Audio Source Update")
# Check to see if this is a new Video Source notification # Check to see if this is a new Video Source notification
elif current_source in CONST.source_type_dict.get("Video Sources"): elif current_source in CONST.source_type_dict.get("Video Sources"):
update_devices(message, current_source, 'Video Source') update_devices(message, current_source, 'Video Source')
if current_source not in ['', CONST.gateway['Current Video Source']]: if current_source not in ['', CONST.gateway['Current Video Source']]:
CONST.gateway['Current Video Source'] = current_source CONST.gateway['Current Video Source'] = current_source
iTunes.notify("Current Video Source: " + str(message['State_Update']['sourceName'])) iTunes.notify("Current Video Source: " + str(message['State_Update']['sourceName']),
"Video Source Update")
# If source is not in Audio or Video dictionaries then it's a standby message # If source is not in Audio or Video dictionaries then it's a standby message
else: else:
update_devices(message, current_source, 'No Source') update_devices(message, current_source, 'No Source')
except KeyError: elif 'State_Update' in message and 'state' in message['State_Update']:
pass update_devices(message, 'No Source', 'No Source')
def update_devices(message, current_source, source_type): def update_devices(message, current_source, source_type):
# Loop over devices to find device sending update message # Loop over devices to find device sending update message
@ -67,14 +109,21 @@ if __name__ == '__main__':
return return
# If in standby, set current source to None # If in standby, set current source to None
elif message['State_Update']['state'] in ['', 'Standby'] or \ if message['State_Update']['state'] == 'Standby':
message['State_Update']['source'] == '':
if device['State'] != 'Standby': if device['State'] != 'Standby':
iTunes.notify(str(device['Device']) + " is now on standby") iTunes.notify(str(device['Device']) + " is now on standby",
"System Standby")
device['State'] = 'Standby' device['State'] = 'Standby'
device['Now Playing'] = 'None' if message['Device'] in CONST.gateway['N.MUSIC Renderers']:
device['Current Source'] = 'None' CONST.gateway['N.MUSIC Renderers'].remove(message['Device'])
device['last update'] = time.time() device['Now Playing'] = 'None'
device['Current Source'] = 'None'
device['last update'] = time.time()
device['Channel/Track'] = '0'
try:
message['State_Update']['nowPlaying'] = 'None'
except KeyError:
pass
# Report player state and source # Report player state and source
elif message['State_Update']['state'] in ["None", "Play"]: elif message['State_Update']['state'] in ["None", "Play"]:
@ -82,24 +131,28 @@ if __name__ == '__main__':
message['State_Update']['nowPlaying'] != device['Now Playing']: message['State_Update']['nowPlaying'] != device['Now Playing']:
iTunes.notify(str(device['Device']) + " now playing " + iTunes.notify(str(device['Device']) + " now playing " +
str(message['State_Update']['nowPlaying']) + " from source " + str(message['State_Update']['nowPlaying']) + " from source " +
str(message['State_Update']['sourceName'])) str(message['State_Update']['sourceName']),
"Now Playing Update")
# Update device data # Update device data
device['Now Playing'] = message['State_Update']['nowPlaying'] device['Now Playing'] = message['State_Update']['nowPlaying']
device['Channel/Track'] = '0' CONST.gateway['Now Playing'] = device['Now Playing']
device['Channel/Track'] = 'NA'
device['last update'] = time.time() device['last update'] = time.time()
elif 'nowPlayingDetails' in message['State_Update'] and \ elif 'nowPlayingDetails' in message['State_Update'] and \
message['State_Update']['nowPlaying'] in ['', 'Unknown'] and \ message['State_Update']['nowPlaying'] in ['', 'Unknown'] and \
message['State_Update']['nowPlayingDetails']['channel_track'] \ message['State_Update']['nowPlayingDetails']['channel_track'] != \
not in [device['Channel/Track'], '0', '255']: device['Channel/Track']:
if source_type == "Audio Source": if source_type == "Audio Source":
iTunes.notify(str(device['Device']) + " now playing track " + iTunes.notify(str(device['Device']) + " now playing track " +
str(message['State_Update']['nowPlayingDetails']['channel_track']) + str(message['State_Update']['nowPlayingDetails']['channel_track']) +
" from source " + str(message['State_Update']['sourceName'])) " from source " + str(message['State_Update']['sourceName']),
"Now Playing Update")
elif source_type == "Video Source": elif source_type == "Video Source":
iTunes.notify(str(device['Device']) + " now playing channel " + iTunes.notify(str(device['Device']) + " now playing channel " +
str(message['State_Update']['nowPlayingDetails']['channel_track']) + str(message['State_Update']['nowPlayingDetails']['channel_track']) +
" from source " + str(message['State_Update']['sourceName'])) " from source " + str(message['State_Update']['sourceName']),
"Now Playing Update")
# Update device data # Update device data
device['Now Playing'] = 'Unknown' device['Now Playing'] = 'Unknown'
device['Channel/Track'] = message['State_Update']['nowPlayingDetails']['channel_track'] device['Channel/Track'] = message['State_Update']['nowPlayingDetails']['channel_track']
@ -107,26 +160,31 @@ if __name__ == '__main__':
elif message['State_Update']['state'] != device['State']: elif message['State_Update']['state'] != device['State']:
iTunes.notify(str(device['Device']) + " is now playing " + iTunes.notify(str(device['Device']) + " is now playing " +
str(message['State_Update']['sourceName'])) str(message['State_Update']['sourceName']),
"Source Update")
# Update device data # Update device data
device['Now Playing'] = "Unknown" device['Now Playing'] = "Unknown"
device['Channel/Track'] = '0' device['Channel/Track'] = 'NA'
device['last update'] = time.time() device['last update'] = time.time()
# Update the state of the device # Update the state of the device
device['Current Source'] = current_source device['Current Source'] = current_source
device['Current Source Type'] = source_type device['Current Source Type'] = source_type
device['State'] = message['State_Update']['state'] device['State'] = message['State_Update']['state']
logging.debug(json.dumps(device, indent=4))
except KeyError: except KeyError:
pass pass
# Now loop over devices and update any playing an audio source with the new distributed # Now loop over devices and update any playing an audio source with the new distributed
# source (the masterlink network only supports a single audio source for distribution) # source (the masterlink network only supports a single audio source for distribution)
if source_type == "Audio Source": try:
if device['Current Source'] in CONST.source_type_dict.get("Audio Sources"): if source_type == "Audio Source":
device['Current Source'] = current_source if device['Current Source'] in CONST.source_type_dict.get("Audio Sources"):
device['Current Source'] = current_source
device['last update'] = time.time()
except KeyError:
pass
logging.debug(json.dumps(device, indent=4))
def iTunes_transport_control(message): def iTunes_transport_control(message):
try: # If N.MUSIC command, trigger appropriate iTunes control try: # If N.MUSIC command, trigger appropriate iTunes control
@ -157,15 +215,12 @@ if __name__ == '__main__':
elif message['State_Update']['command'] == "Rewind": elif message['State_Update']['command'] == "Rewind":
iTunes.rewind(-10) iTunes.rewind(-10)
elif message['State_Update']['command'] == "Shift-1/Random":
iTunes.shuffle()
# If 'Info' pressed - update track info # If 'Info' pressed - update track info
elif message['State_Update']['command'] == "Info": elif message['State_Update']['command'] == "Info":
track_info = iTunes.get_current_track_info() _get_track_info(message)
logging.info("iTunes current Track Info:"
"\n\tTrack: " + track_info[0] +
"\n\tAlbum: " + track_info[1] +
"\n\tArtist: " + track_info[2] + "\n")
iTunes.notify("iTunes playing " + track_info[2] + " - " + track_info[0] +
" from album: " + track_info[1])
# If colour key pressed, execute the appropriate applescript # If colour key pressed, execute the appropriate applescript
elif message['State_Update']['command'] == "Green": elif message['State_Update']['command'] == "Green":
@ -190,6 +245,24 @@ if __name__ == '__main__':
except KeyError: except KeyError:
pass pass
def _get_track_info(message):
track_info = iTunes.get_current_track_info()
if track_info[0] not in [None, 'None']:
track_info = track_info[0] + "' by " + track_info[2] + " from the album '" + track_info[1] + "'"
if 'Type' in message and \
message['Type'] == "AV RENDERER" and 'nowPlaying' in message['State_Update']:
message['State_Update']['nowPlaying'] = track_info
if track_info != CONST.gateway['Now Playing']:
CONST.gateway['Now Playing'] = track_info
logging.info("iTUNES CURRENT TRACK INFO:\n\tNow playing: " + track_info + "\n")
if 'Device' in message and message['Type'] == "AV RENDERER ":
iTunes.notify(str(message['Device']) + ' now playing ' + track_info,
"Now Playing Update")
else:
iTunes.notify("iTunes now playing " + track_info,
"Now Playing Update")
def message_log(name, header, payload, message): def message_log(name, header, payload, message):
# Pretty formatting - convert to JSON format then remove braces # Pretty formatting - convert to JSON format then remove braces
message = json.dumps(message, indent=4) message = json.dumps(message, indent=4)
@ -242,7 +315,7 @@ if __name__ == '__main__':
9100, # BLGW Home Integration Protocol port - default 9100 9100, # BLGW Home Integration Protocol port - default 9100
23] # Telnet port - default 23 23] # Telnet port - default 23
user = 'admin' # User name - default is admin user = 'admin' # User name - default is admin
pwd = 'admin' # password - default is admin pwd = 'admin' # password - default is admin
# Instantiate an AppleScriptBridge MusicController for N.MUSIC control of apple Music # Instantiate an AppleScriptBridge MusicController for N.MUSIC control of apple Music
iTunes = ASBridge.MusicController() iTunes = ASBridge.MusicController()