1
0
Fork 0
dotfiles/panel/panel_bar.py

223 lines
7.1 KiB
Python
Executable file

#!/usr/bin/python3 -u
import subprocess
import os
import select
import sys
import fcntl
import time
import datetime
import socket
PERIOD = 1
# Lemonbar seems to struggle if you update too fast
MIN_UPDATE_INTERVAL = 0
PATH = os.path.dirname(os.path.realpath(__file__))
PANEL_FOREGROUND='#FF888888'
PANEL_BACKGROUND='#FF222222'
COLOR_FOCUSED_FG = '#FFE0E0E0'
COLOR_OCCUPIED_FG = '#FFA3A6AB'
COLOR_URGENT_BG = '#FF880000'
COLOR_WARNING_BG = '#FFeeee00'
DIVIDER = '|'
def color_string(string, fg=PANEL_FOREGROUND, bg=PANEL_BACKGROUND):
return "%%{F%s}%%{B%s} %s %%{B-}%%{F-}" % (fg, bg, string)
def button(text, executable):
return '%{A:' + os.path.expanduser(executable) + ':}' + text + '%{A}'
divider = color_string(DIVIDER, fg='#FF444444')
def status_update(data):
# monitor, occupied, free, urgent (with uppercase meaning focused)
colors = {
'm': None,
'M': None,
'o': {},
'O': {'fg': COLOR_FOCUSED_FG},
'f': None,
'F': {'fg': COLOR_FOCUSED_FG},
'u': {'bg': COLOR_URGENT_BG},
'U': {'fg': COLOR_FOCUSED_FG, 'bg': COLOR_URGENT_BG},
'L': None, # focused desktop layout
'T': None, # focused node state
'G': None, # focused node active flags
}
data = data.decode('utf-8')
assert data[0] == 'W'
data = data[1:-1]
data = data.split(':')
wm_info = ""
for item in data:
if len(item) == 0:
continue
color = colors[item[0]]
if color is not None:
name = item[1:]
wm_info += color_string(name, **color)
return 'status', wm_info
def clock_update(_):
clock = time.strftime("%a %d %b %H:%M:%S")
return 'clock', color_string(clock)
def volume_update(_):
info = subprocess.check_output(['pactl', 'list', 'sinks']).decode('utf-8')
info = (line.strip() for line in info[:-1].split('\n'))
for line in info:
if line.startswith('Volume:'):
volume = line.split('/')[1].strip()[:-1]
elif line.startswith('Mute:'):
mute = line.split(' ')[1].strip()
volume = int(volume)
muted = mute == 'yes'
mute_icon = "-" if muted else "%"
kwargs = {}
if not muted:
kwargs['bg'] = COLOR_WARNING_BG
return 'volume', color_string("Vol: %d%s" % (volume, mute_icon), **kwargs)
def wifi_update(_):
info = subprocess.check_output(['netctl-auto', 'list'])
lines = [line.decode('utf-8') for line in info.splitlines()]
active = [line[2:] for line in lines if line[0] == '*']
if active:
interface, network = active[0].split('-', 1)
else:
interface, network = 'none', '-'
return 'wifi', color_string('{}: {}'.format(interface, network))
def battery_update(_):
ps_info = subprocess.check_output(['ls', '/sys/class/power_supply'])
ps_list = ps_info.decode('utf-8').splitlines()
batteries = [ps for ps in ps_list if ps.startswith('BAT')]
if not batteries:
return 'battery', None
info = subprocess.check_output(['acpi', '--battery'])
info = info[:-1].decode('utf-8')
_, status = info.splitlines()[0].split(': ')
state, charge, *_ = status.split(', ')
state = state
charge = int(charge[:-1])
colors = {}
if state != 'Charging' and charge < 10:
colors['bg'] = COLOR_URGENT_BG
return 'battery', color_string("%s %d%%" % (state, charge), **colors)
def mail_update(_):
counts = []
account_colors = {
'gmail': '#990000',
'metagram': '#009900',
'mit': '#000099',
}
for account, color in sorted(account_colors.items()):
mail_path = os.path.expanduser('~/.mail/{}/Inbox/new'.format(account))
try:
message_count = len(os.listdir(mail_path))
except FileNotFoundError:
continue
colors = {}
if message_count:
colors['bg'] = color
colors['fg'] = '#FFFFFF'
mutt_config_path = os.path.expanduser('~/.mutt/{}'.format(account))
text = button(str(message_count), 'urxvt -e mutt -F {} &'.format(mutt_config_path))
counts.append(color_string(text, **colors))
return 'mail', ''.join(counts)
def make_string(status, clock, volume, battery, wifi, mail):
left = '%{l}' + divider.join([status])
center = '%{c}' + divider.join([])
right = '%{r}' + divider.join(filter(None, [wifi, mail, volume, battery, clock]))
bar = left + center + right
print(bar)
def open_socket(address):
try:
os.unlink(address)
except OSError:
if os.path.exists(address):
raise
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(address)
sock.listen(1)
return sock
def main(argv):
state_strings = {
'status': "",
'battery': battery_update(None)[1],
'volume': volume_update(None)[1],
'clock': clock_update(None)[1],
'mail': mail_update(None)[1],
}
# These processes trigger when events happen.
bspc_control = subprocess.Popen(["bspc", "subscribe"],
stdout=subprocess.PIPE)
listening = [bspc_control]
# Make stdout nonblocking.
# Sometimes select says there's output, but read fails.
# This makes it possible to handle those cases graciously.
for l in listening:
fd = l.stdout.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
read_list = [l.stdout for l in listening]
# These sockets trigger when events happen
socks = []
read_list.extend(socks)
previous_update = 0
now = datetime.datetime.now()
timeout = PERIOD-(now.second % PERIOD)
file_to_fns = {
bspc_control.stdout: status_update,
# volume_sock: volume_update,
}
while True:
to_read, _, _ = select.select(read_list, [], [], timeout)
for r in to_read:
if r in socks:
connection, _ = r.accept()
data = connection.recv(1024)
connection.close()
else:
try:
data = r.readline()
except OSError:
# Some weird cases might get you here
continue
if (data == b''):
print("One of our scripts died?", file=sys.stderr)
return 1
state_type, result = file_to_fns[r](data)
state_strings[state_type] = result
state_strings['wifi'] = wifi_update(None)[1]
state_strings['clock'] = clock_update(None)[1]
state_strings['volume'] = volume_update(None)[1]
state_strings['battery'] = battery_update(None)[1]
state_strings['mail'] = mail_update(None)[1]
if time.time() - previous_update < MIN_UPDATE_INTERVAL:
timeout = MIN_UPDATE_INTERVAL - (time.time()-previous_update)
else:
make_string(**state_strings)
previous_update = time.time()
now = datetime.datetime.now()
period_timeout = PERIOD-(now.second % PERIOD)
timeout = max(period_timeout, MIN_UPDATE_INTERVAL)
if __name__ == "__main__":
sys.exit(main(sys.argv))