# default_exp core #export import json,tweepy,hmac,hashlib,traceback,shutil,time,fcntl from fastcore.imports import * from fastcore.foundation import * from fastcore.utils import * from fastcore.script import * from fastcore.meta import * from fastcore.test import * from configparser import ConfigParser from ipaddress import ip_address,ip_network from socketserver import ThreadingTCPServer from fastcgi.http import MinimalHTTPHandler from textwrap import dedent #export class ReuseThreadingServer(ThreadingTCPServer): allow_reuse_address = 1 #export def tweet_text(payload): "Send a tweet announcing release based on `payload`" rel_json = payload['release'] url = rel_json['url'] owner,repo = re.findall(r'https://api.github.com/repos/([^/]+)/([^/]+)/', url)[0] tweet_tmpl = "New #{repo} release: v{tag_name}. {html_url}\n\n{body}" res = tweet_tmpl.format(repo=repo, tag_name=rel_json['tag_name'], html_url=rel_json['html_url'], body=rel_json['body']) if len(res)<=280: return res return res[:279] + "…" #export def check_sig(content, headers, secret): digest = hmac.new(secret, content, hashlib.sha1).hexdigest() assert f'sha1={digest}' == headers.get('X-Hub-Signature') #export class _RequestHandler(MinimalHTTPHandler): def _post(self): assert self.command == 'POST' if self.server.check_ip: src_ip = re.split(', *', self.headers.get('X-Forwarded-For', ''))[0] or self.client_address[0] src_ip = ip_address(src_ip) assert any((src_ip in wl) for wl in self.server.whitelist) self.send_response(200) self.end_headers() length = self.headers.get('content-length') if not length: return content = self.rfile.read(int(length)) if self.server.debug: print(self.headers, content) return payload = json.loads(content.decode()) if payload.get('action',None)=='released': check_sig(content, self.headers, self.server.gh_secret) tweet = tweet_text(payload) stat = self.server.api.update_status(tweet) print(stat.id) self.wfile.write(b'ok') def handle(self): try: self._post() except Exception as e: sys.stderr.write(traceback.format_exc()) def log_message(self, fmt, *args): sys.stderr.write(fmt%args) #export def reconfig(s): if hasattr(s, 'reconfigure'): return s.reconfigure(line_buffering=True) try: fl = fcntl.fcntl(s.fileno(), fcntl.F_GETFL) fl |= os.O_SYNC fcntl.fcntl(s.fileno(), fcntl.F_SETFL, fl) except io.UnsupportedOperation: pass #export @call_parse def run_server(hostname: Param("Host name or IP", str)='localhost', port: Param("Port to listen on", int)=8000, debug: Param("If True, do not trigger actions, just print", bool_arg)=False, inifile: Param("Path to settings ini file", str)='twitter.ini', check_ip: Param("Check source IP against GitHub list", bool_arg)=True, single_request: Param("Handle one request", bool_arg)=False): "Run a GitHub webhook server that tweets about new releases" assert os.path.exists(inifile), f"{inifile} not found" cfg = ConfigParser(interpolation=None) cfg.read([inifile]) cfg = cfg['DEFAULT'] auth = tweepy.OAuthHandler(cfg['consumer_key'], cfg['consumer_secret']) auth.set_access_token(cfg['access_token'], cfg['access_token_secret']) os.environ['PYTHONUNBUFFERED'] = '1' print(f"Listening on {(hostname,port)}") with ReuseThreadingServer((hostname, port), _RequestHandler) as httpd: httpd.gh_secret = bytes(cfg['gh_secret'], 'utf-8') httpd.api = tweepy.API(auth) httpd.whitelist = L(urljson('https://api.github.com/meta')['hooks']).map(ip_network) httpd.check_ip,httpd.debug = check_ip,debug if single_request: httpd.handle_request() else: try: httpd.serve_forever() except KeyboardInterrupt: print("Closing") time.sleep(0.5) # wait for previous server to stop threaded(partial(run_server, check_ip=False, debug=True, single_request=True))(); time.sleep(0.5) urlread("http://localhost:8000", spam=1) #export @call_parse def fastwebhook_install_service(hostname: Param("Host name or IP", str)='0.0.0.0', port: Param("Port to listen on", int)=8000, inifile: Param("Path to settings ini file", str)='twitter.ini', check_ip: Param("Check source IP against GitHub list", bool_arg)=True, service_path: Param("Directory to write service file to", str)="/etc/systemd/system/"): "Install fastwebhook as a service" script_loc = shutil.which('fastwebhook') inifile = Path(inifile).absolute() _unitfile = dedent(f""" [Unit] Description=fastwebhook Wants=network-online.target After=network-online.target [Service] ExecStart={script_loc} --inifile {inifile} --check_ip {check_ip} --hostname {hostname} --port {port} Restart=always [Install] WantedBy=multi-user.target""") Path("fastwebhook.service").write_text(_unitfile) run(f"sudo cp fastwebhook.service {service_path}") #hide from nbdev.export import notebook2script notebook2script()