# default_exp core
API details.
#export
import json,tweepy,hmac,hashlib,traceback,shutil,time,fcntl,re
from fastcore.imports import *
from fastcore.foundation import *
from fastcore.utils import *
from fastcore.script import *
from fastcore.meta import *
from fastcore.test import *
from configparser import ConfigParser
from ipaddress import ip_address,ip_network
from socketserver import ThreadingTCPServer
from fastcgi.http import MinimalHTTPHandler
from fastcgi import ReuseThreadingServer
from ghapi.all import GhApi
from textwrap import dedent
#export
def clean_tweet_body(body):
"Cleans links and sets proper @'s in the tweet body"
links = re.findall(r'\[([^\]]+)\]\(([^)]+)\)', body)
for issue, link in links:
str_replace = ""
if "@" in issue:
str_replace = issue[1:]
username = GhApi().users.get_by_username(str_replace).twitter_username
if username: str_replace = f"@{username}"
original_link = f"[{issue}]({link})"
else: original_link = f" ([{issue}]({link}))"
body = body.replace(original_link, str_replace)
body = body.replace("### ", "")
return body
#hide
body = '''### New Features
- Some dummy feature ([#1234](https://github.com/user/repo/link-to-pr))
### Bugs Squashed
- Some dummy bugfix ([#2345](https://github.com/user/repo/linktoissue)), thanks to [@jph00](https://github.com/jph00)'''
cleaned_body = '''New Features
- Some dummy feature
Bugs Squashed
- Some dummy bugfix, thanks to @jeremyphoward'''
test_eq(clean_tweet_body(body), cleaned_body)
#hide
body = '''### New Features
- Some dummy feature ([#1234](https://github.com/user/repo/link-to-pr))
### Bugs Squashed
- Some dummy bugfix ([#2345](https://github.com/user/repo/linktoissue)), thanks to [@fastai](https://github.com/fastai)'''
cleaned_body = '''New Features
- Some dummy feature
Bugs Squashed
- Some dummy bugfix, thanks to fastai'''
test_eq(clean_tweet_body(body), cleaned_body)
#export
def tweet_text(payload):
"Send a tweet announcing release based on `payload`"
rel_json = payload['release']
url = rel_json['url']
owner,repo = re.findall(r'https://api.github.com/repos/([^/]+)/([^/]+)/', url)[0]
tweet_tmpl = "New #{repo} release: v{tag_name}. {html_url}\n\n{body}"
res = tweet_tmpl.format(repo=repo, tag_name=rel_json['tag_name'],
html_url=rel_json['html_url'], body=clean_tweet_body(rel_json['body']))
if len(res)<=280: return res
return res[:279] + "…"
#export
def check_sig(content, headers, secret):
digest = hmac.new(secret, content, hashlib.sha1).hexdigest()
if f'sha1={digest}' == headers.get('X-Hub-Signature'): return True
self.wfile.write(b'mismatch')
return False
#export
class _RequestHandler(MinimalHTTPHandler):
def _post(self):
assert self.command == 'POST'
if self.server.check_ip:
src_ip = re.split(', *', self.headers.get('X-Forwarded-For', ''))[0] or self.client_address[0]
src_ip = ip_address(src_ip)
assert any((src_ip in wl) for wl in self.server.whitelist)
self.send_response(200)
self.end_headers()
length = self.headers.get('content-length')
if not length: return
content = self.rfile.read(int(length))
if self.server.debug:
print(self.headers, content)
return
payload = json.loads(content.decode())
if payload.get('action',None)!='released': return
if not check_sig(content, self.headers, self.server.gh_secret): return
tweet = tweet_text(payload)
stat = self.server.api.update_status(tweet)
self.wfile.write(f'{stat.id}'.encode())
self.wfile.write(b'ok')
def handle(self):
try: self._post()
except Exception as e: sys.stderr.write(traceback.format_exc())
def log_message(self, fmt, *args): sys.stderr.write(fmt%args)
#export
def reconfig(s):
if hasattr(s, 'reconfigure'): return s.reconfigure(line_buffering=True)
try:
fl = fcntl.fcntl(s.fileno(), fcntl.F_GETFL)
fl |= os.O_SYNC
fcntl.fcntl(s.fileno(), fcntl.F_SETFL, fl)
except io.UnsupportedOperation: pass
#export
@call_parse
def run_server(
hostname:str='localhost', # Host name or IP
port:int=8000, # Port to listen on
debug:bool_arg=False, # If True, do not trigger actions, just print
inifile:str='twitter.ini', # Path to settings ini file
check_ip:bool_arg=True, # Check source IP against GitHub list
single_request:bool_arg=False # Handle one request
):
"Run a GitHub webhook server that tweets about new releases"
assert os.path.exists(inifile), f"{inifile} not found"
cfg = ConfigParser(interpolation=None)
cfg.read([inifile])
cfg = cfg['DEFAULT']
auth = tweepy.OAuthHandler(cfg['consumer_key'], cfg['consumer_secret'])
auth.set_access_token(cfg['access_token'], cfg['access_token_secret'])
os.environ['PYTHONUNBUFFERED'] = '1'
print(f"Listening on {(hostname,port)}")
with ReuseThreadingServer((hostname, port), _RequestHandler) as httpd:
httpd.gh_secret = bytes(cfg['gh_secret'], 'utf-8')
httpd.api = tweepy.API(auth)
httpd.whitelist = L(urljson('https://api.github.com/meta')['hooks']).map(ip_network)
httpd.check_ip,httpd.debug = check_ip,debug
if single_request: httpd.handle_request()
else:
try: httpd.serve_forever()
except KeyboardInterrupt: print("Closing")
time.sleep(0.5) # wait for previous server to stop
threaded(partial(run_server, check_ip=False, debug=True, single_request=True))();
Listening on ('localhost', 8000)
time.sleep(0.5)
urlread("http://localhost:8000", spam=1)
Accept-Encoding: identity Content-Type: application/x-www-form-urlencoded Content-Length: 6 Host: localhost:8000 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9 Accept-Language: en-US,en;q=0.9 Cache-Control: max-age=0 Sec-Fetch-Dest: document Sec-Fetch-Mode: navigate Sec-Fetch-Site: none Sec-Fetch-User: ?1 Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36 Connection: close b'spam=1'
''
#export
@call_parse
def fastwebhook_install_service(
hostname:str='0.0.0.0', # Host name or IP
port:int=8000, # Port to listen on
inifile:str='twitter.ini', # Path to settings ini file
check_ip:bool_arg=True, # Check source IP against GitHub list
service_path:str="/etc/systemd/system/" # Directory to write service file to
):
"Install fastwebhook as a service"
script_loc = shutil.which('fastwebhook')
inifile = Path(inifile).absolute()
_unitfile = dedent(f"""
[Unit]
Description=fastwebhook
Wants=network-online.target
After=network-online.target
[Service]
ExecStart={script_loc} --inifile {inifile} --check_ip {check_ip} --hostname {hostname} --port {port}
Restart=always
[Install]
WantedBy=multi-user.target""")
Path("fastwebhook.service").write_text(_unitfile)
run(f"sudo cp fastwebhook.service {service_path}")
This fastcore.script CLI installs fastwebhook as a systemd service. Run fastwebhook_install_service --help in your terminal for options.
#hide
from nbdev.export import notebook2script
notebook2script()
Converted 00_core.ipynb. Converted index.ipynb.