#| default_exp ghtop
#| export
import sys, signal, shutil, os, json
# from dashing import *
from collections import defaultdict
from warnings import warn
from itertools import islice
from fastcore.utils import *
from fastcore.foundation import *
from fastcore.script import *
from ghapi.all import *
from ghtop.richext import *
from ghtop.all_rich import (Console, Color, FixedPanel, box, Segments, Live,
grid, ConsoleOptions, Progress, BarColumn, Spinner, Table)
evts = load_sample_events()
#| export
ETYPES=PushEvent,PullRequestEvent,IssuesEvent,ReleaseEvent
def get_sparklines():
s1 = ESpark('Push', 'magenta', [PushEvent], mx=30)
s2 = ESpark('PR', 'yellow', [PullRequestEvent, PullRequestReviewCommentEvent, PullRequestReviewEvent], mx=8)
s3 = ESpark('Issues', 'green', [IssueCommentEvent,IssuesEvent], mx=6)
s4 = ESpark('Releases', 'blue', [ReleaseEvent], mx=0.4)
s5 = ESpark('All Events', 'orange', mx=45)
return Stats([s1,s2,s3,s4,s5], store=5, span=5, spn_lbl='5/s', show_freq=True)
#| export
term = Console()
tdim = L(os.popen('stty size', 'r').read().split())
if not tdim: theight,twidth = 15,15
else: theight,twidth = tdim.map(lambda x: max(int(x)-4, 15))
#| export
def _exit(msg):
print(msg, file=sys.stderr)
sys.exit()
When creating GhApi we can pass a callback which will be called after each API operation. In this case, we use it to warn the user when their quota is getting low.
#| exports
def limit_cb(rem,quota):
"Callback to warn user when close to using up hourly quota"
w='WARNING '*7
if rem < 1000: print(f"{w}\nRemaining calls: {rem} out of {quota}\n{w}", file=sys.stderr)
#| export
def pct_comp(api): return int(((5000-int(api.limit_rem)) / 5000) * 100)
#| export
def tail_events(evt, api):
"Print events from `fetch_events` along with a counter of push events"
p = FixedPanel(theight, box=box.HORIZONTALS, title='ghtop')
s = get_sparklines()
g = grid([[s], [p]])
with Live(g):
for e in evt:
s.add_events(e)
s.update_prog(pct_comp(api))
p.append(e)
g = grid([[s], [p]])
#| export
def _user_grid():
g = Table.grid(expand=True)
g.add_column(justify="left")
for i in range(4): g.add_column(justify="center")
g.add_row("", "", "", "", "")
g.add_row("User", "Events", "PRs", "Issues", "Pushes")
return g
#| export
def watch_users(evts, api):
"Print a table of the users with the most events"
users,users_events = defaultdict(int),defaultdict(lambda: defaultdict(int))
with Live() as live:
s = get_sparklines()
while True:
for x in islice(evts, 10):
users[x.actor.login] += 1
users_events[x.actor.login][x.type] += 1
s.add_events(x)
ig = _user_grid()
sorted_users = sorted(users.items(), key=lambda o: (o[1],o[0]), reverse=True)
for u in sorted_users[:theight]:
data = (*u, *itemgetter('PullRequestEvent','IssuesEvent','PushEvent')(users_events[u[0]]))
ig.add_row(*L(data).map(str))
s.update_prog(pct_comp(api))
g = grid([[s], [ig]])
live.update(g)
#| export
def _panelDict2Grid(pd):
ispush,ispr,isiss,isrel = pd.values()
return grid([[ispush,ispr],[isiss,isrel]], width=twidth)
def quad_logs(evts, api):
"Print 4 panels, showing most recent issues, commits, PRs, and releases"
pd = {o:FixedPanel(height=(theight//2)-1,
width=(twidth//2)-1,
box=box.HORIZONTALS,
title=camel2words(remove_suffix(o.__name__,'Event'))) for o in ETYPES}
p = _panelDict2Grid(pd)
s = get_sparklines()
g = grid([[s], [p]])
with Live(g):
for e in evts:
s.add_events(e)
s.update_prog(pct_comp(api))
typ = type(e)
if typ in pd: pd[typ].append(e)
p = _panelDict2Grid(pd)
g = grid([[s], [p]])
#| export
def simple(evts, api):
for ev in evts: print(f"{ev.actor.login} {ev.type} {ev.repo.name}")
#| export
def _get_token():
path = Path.home()/".ghtop_token"
if path.is_file():
try: return path.read_text().strip()
except: _exit("Error reading token")
else: token = github_auth_device()
path.write_text(token)
return token
#| export
def _signal_handler(sig, frame):
if sig != signal.SIGINT: return
term.clear()
sys.exit(0)
_funcs = dict(tail=tail_events, quad=quad_logs, users=watch_users, simple=simple)
_filts = str_enum('_filts', 'users', 'repo', 'org')
_OpModes = str_enum('_OpModes', *_funcs)
@call_parse
def main(mode: Param("Operation mode to run", _OpModes),
include_bots: Param("Include bots (there's a lot of them!)", store_true)=False,
types: Param("Comma-separated types of event to include (e.g PushEvent)", str)='',
pause: Param("Number of seconds to pause between requests to the GitHub api", float)=0.4,
filt: Param("Filtering method", _filts)=None,
filtval: Param("Value to filter by (for `repo` use format `owner/repo`)", str)=None):
signal.signal(signal.SIGINT, _signal_handler)
types = types.split(',') if types else None
if filt and not filtval: _exit("Must pass `filter_value` if passing `filter_type`")
if filtval and not filt: _exit("Must pass `filter_type` if passing `filter_value`")
kwargs = {filt:filtval} if filt else {}
api = GhApi(limit_cb=limit_cb, token=_get_token())
evts = api.fetch_events(types=types, incl_bot=include_bots, pause=float(pause), **kwargs)
_funcs[mode](evts, api)
#| include: false
from nbdev import nbdev_export
nbdev_export()
Converted 00_ghtop.ipynb. Converted index.ipynb. Converted richext.ipynb.