""" A lightweight interface to monitoring tags on twitter. Just pass your hashtag (including the hash), and you'll get updates every 100 secs or so. """ from urllib import parse as urlparse import base64 import datetime import json import os import re import requests import sys import textwrap import time import webbrowser import zlib import tweepy # if an obnoxious spammer is under way at a time, put a string # they're spamming in here. STOP_STRINGS = [] class TwitterError(Exception): pass class SkipThis(Exception): pass APP_KEY = "M4JHyd0tyaqqoEZRdyBg" APP_SECRET = "3aEgiF1vpM7EjUh6zkK5ywXRxROZnT4vOMZj7ZaSR8" LASTID_PATH = os.path.expanduser("~/.twitupdate") BLACKLIST_PATH = os.path.expanduser("~/.twitupdate.blacklist") try: import alsaaudio AUDIO_SUPPORTED = True except ImportError: AUDIO_SUPPORTED = False def open_url(url): # webbrowser.open blocks on me. I'll lamely spawn threads for it, # hoping I'll never be so long-lived that it matters. import threading t = threading.Thread(daemon=True, target=lambda: webbrowser.open(url, new=0)) t.start() def play_scratch(): """may play a scratching sound if the system supports it. This may silently fail in all kinds of ways. """ # this could block, but it shouldn't for any significant period of time, so # I don't bother for now if not AUDIO_SUPPORTED: return p = alsaaudio.PCM() if p.setformat(alsaaudio.PCM_FORMAT_MU_LAW)!=alsaaudio.PCM_FORMAT_MU_LAW: return p.setchannels(1) p.setrate(44100) p.setperiodsize(7543) p.write(TWEET_SOUND) p.close() TWEET_SOUND = zlib.decompress(base64.b64decode('eJyNWU2O5LiZvcrM2O6qzowfQRAEQRAIcMEFF1xxyQPwWIZhu7uyKiMjIAiCIAgCuOCCC6645AF4DM6jsqpdXbZnLFRlKIIU+f2/91E5/1+X/u1Gv/87/uPP8fGfXN8vpb89/4+V9PtAfr/T3wbz9xPfv/6bHfO3PfSPgv/+h3+h22/iJVz5t4WKNN8W+CqAKiNKqhRjwuwYQowxa6UkrvIH4xiLOmC6FDkmrWT2QascotIBNxGfPurodfZKRikSnuIxiYz5HKu5bLCECjLxJLCy5ponia/JafzEsHDGc04HHbPCIzJrEQUECson7VXKeFBoKhXPWkISn3P5p1XIKuLZJBOEiFgnC43tEz7iIRmk1AmLRPxPmKCTEiorUbaQOUGYmPAk1ikzYYakVMaSGINZ9PufpOVxq3Cby3g8/sIU+KXc62M2vuRcZh5fMKssn45p71+Oj/eHji1iSNgOskc4Ah+pDMKoXuukRfI+Cg6fJHlESYJr4AKhsTOkxwPJh1A2y7gNOTOhdxepgKVV2iPnUbFgkvCSqsjTDosSmVnw0giR+iCSE5ZpwTT30nJPFBdZw9kKPhFHSAQWhsS8cEl5mknm2nOjsqaJxMRNTp5nzgLPVgbHs6JRCZvUHllSJCfpvfZOwJkkOpUMwigxAZ2t1dlGLqOiygUdXOYyKaZgnrBb+CFBKheUdiZCEs0wzR1eFKx4F3K4CIskQaiCESBIwK/wv2Q8WVsMqpSgPAeLAZhaa8oQzDBXdDZoRrAOAh1LBoQEYqKEt0zO2Ci5kHAD3GUtwoYzBCD8wRFI1qWyGgSU+Gp377MQFOt6xBIcb5EQEhmQA75DKGthDUKJRCC7HBGs8JPgBKHs/B7hKC4QxTKXLX3YnY9MQjQRSsZEjenYQRLKWXRuczFyxklJF4Q2QgRpl6RiPeU8ueCwmWJUljzNULloGBx2VJRggwhpjCsCE66QHYgv2FsjrkohEFxAf7tuBkpLTgnBksYcJkB0cl6S4ygn0UPMEneC4RLa7xviDnFEKOMIvfJUVBxDCHgUlFQqDwIXX0s057JYSYzj9qgzpQrF4OEKyYZ+oBJiOxdieYxRrKqSN7txCGgyFM+VYcyGI8q68BR0h9yxiASLpgARy+NH5SubxqOy5aIJvAUNYAuPJITWJTvfq94xPR6Cfa2ApQDCQuWnIzkPLQ5xy2z1VfR0rCsE52X4a9FW5ZJl8r/Ai29373MOeQ+FVFGXMpGh7l4UhlWLUXMwGzzjkyqOQZ7YfV3XDYHM6TAM9N1AezFBMWqxybH1Ubu/aVO+hsMfcPaxjfpqyAxvHdvATDEfQpRBb+FJxA8W5EW3Y4mIRw7jFdEQmMVXFgFRFkVqBAs53iOorJFDMbWDa+PhTV4i5uu2sbhffXX8odix2rspwnuAkRIO7+Z41/5Y9IinkjL6QLjin+JABE/XdURkt28bjJe/+bB45kh+/b4jggVqCZiOHEGKLaHqEV9D3xOuS7SUp/WRwi6UFEDYFoWx+rYs69cIl4ebj4peFldFfxQJSFKckkvKfNVPwL4JGwWNfYumX58LRfdie+yEXGewAn1PK7Ouu9flOUSMKjocsZST2+ZpNYmTAdYoji3pEgraBIugKfXs/VLQtChFuT7kgvehNlKI5zKAshZKmpQoSZJ0bY96CdMXh8LrxzNRspLUIAolr1GDSyjutkAiPDYwbYuUhTuUn+AQmJAAWWDM4hhdst6j1Pdd3/cMYYRKHTVH6bJOSTigo12xaQyMD1pZIApqOqfAqeiN9JpF6e8IqZ6lvg+ykU4ws3O+zDpNm6yvadqBLavzopd5XSfjM1dEkFIIQ9jMsu2o00d5R22Fz0W282YShE/buGdSE28zG/L0yNcmbmngfmVPdQhNs29dty6DvNmz2Jpqz6dkLmJp+WOg4yBfWXy7cv2TT/RX7R6XXdBbP4be/OR7ul+nCgn8hb10CwWWTf102k8be4TdZy9mtjT25C7z8CBbZemUJqAeyXTv9stWjeKBIt6469Ls0Is96H2IYtj6OzWSrO3SS13PYjWDfRaynQfv6Je0zq3t+r0L/rzS/NabfPGU7T1PVXDU0maTcm7pUssX1u+MG0BoE6HMwlrWyAc9yegnX6mFp+1USSqmpQm6YUuQjj1BDvfq1GPLj0duvbzaedCLlm4PYuBti5Red5EWn+2n+OwXkYH5+VffaW/7OCkRZ1mXwmEfLMdRCJtl98jsiYHuhZUrsXYqTfxc+TjkZaubcFPDsIEPpNO507zr4yuv6jvv+y8h0RCJZJqM10uYWqg/Id52+qcZQZyx3YeN2O38V+vM06hYrN68itUoB119Qsrwv1Cn9rsVaePrn1wHPB0Ayflpi4FK+ncSqff7k2MgsE1KJJxatlfm6ue11849fiJ5GkBjb0Rz2fD1mbN5Y/3loRovZ/O2DAuPtmbibW/3FJeacW1oyt1tFHEwi7AD+7SHVVsr9/6Di126VK8zeBDCT8SeLyHtUs2Bq4q96RORekNY6WjC0znQZlm4tMlh64e5UhC2TTE3m4dr2ZSuDPUJgB8Q5YjazmR5nxnbPi0ierV7JDVI3Nvudheomdi1DaCIXApvelrYn+dEeHJ53uddy13uZoXxPWu593RIeuD7yC/ESUaC6FpUIRZvm+gcHD6H2IFryj5EC1gBx13GNwZiKWqURwKar1u37oHNxonBgs3vo2Ad1YQPwtgtilVT7d4CyTmQn3s2sAGWiKoHToFTPgxKy+h7KWB5HrIcnjuvgIp2dnbcERR9ALVhctkAdqi2ZzBbZ4Sabw6lzmZu7rNyWxcMrykVxS9TsF732kZGMyppNm9GFjx08XqBVfpBravqsgFn1U1NAU4BkJR4NFOue8XaRvU/K+tZfiwS0c7Juuam3zwZ0rgCVn0Kc2zb5AWSAqtYQUU0oIqgr1uAaUeL6mVDXFZO7BoIlxTVKLBOwUcgGELsm6I0iQHY0BMR932GjDxY8Ct0CugCNIhd10nUXnAg2M77wiKBYyEDKwQgYlMEJTp46/dMWQs0t6g5HRtoCOscbeGnaQU6KtmCBGTlPfquAbL20k3jakyxrMESWjMJnBqIQp8nC2XOGNz2ZQ/Oa07AmQsYABLEIAvZNGBnQCoJcEfbV/gxhkA7C+tOqJNwJYJFRiCPLsAJoFQgyWjigqJlNGd6aUUqg7ngFOhtAD4vI7A4OhDhIAnWQxME1RSaQ01oP0gGdEFXs5VGArGVwr6ty2KTWTfAhue0a7qhJ1U3QABZusl18mitMogeKE8I5YfdbxsQEboMQ3M5XZt+YGgPtkK5NrOHoir6hFyMXdo9ycBDupamwuMB1+O8GtgF3SqgUIELAfoFILE99ZS28Ni6L/f7PD4mGBCIy0h9qbq2K3Q6l7YR+BHNvk7TOIP0UdI9P52rrmm41g6PjvO0bXMhZV5TGJ23XVfVl7owSfjt8Vinx7o6F4e+o0M9oKYF60DTXCmVoFxoHIQmgGPSCm29ebzNPgjet4R0DaNwdrIRYeMLEc3Ih7q6DgNDy6clGF6xMO5QljvW8TBP49t93BeXsB1HlJVzBngQcYjMAcc61dxt5n4fH6uTSPshgfRty7a+zgEtSO6b67muW0TGvsRh6K9XcIfst2kO2mwIASk0ykXyM8Dfiuvp6bmmiJ0ctyBRRVxmUG1flsdsVt/2jGhTiMqS2u7y1NfPPLp5fvs83x4pSkwHMR4dB1OiJRCSADlxmlbnYcjev7y87VpXfVejPobU1qxuK/S8RprCaAkLaXL7fgc7GnR7uYouoCvdAPIoEm6x0wLBmAWFMtHUouvqZoD1Z/vZTNZMI3znXQd36x5d3t6SoRWdH6cR7K9+qru8qtWS9uO5TXr+5T57WQ3nE5c9AFqKdVqsyx0hKc7W8fZ/Ppxwv71tCW3/si/WcoCkCDpStNF6/vTpLeSh+tjK0gaBkE2LMZldmEa9NrQrjfG2HWTf3xcUgJaZh81gYmkZje7+9N8nopb7hkiwq+lP1wu1MDNK1+OxaXY6MQkgvxKzjFN+PgNZdg3Xry8TyiRZ96GtztLHxbZDKTdNy/jyQFdcx9lbDargFtWTVhul+0u0jxcfSZNkggOU+fvnX+4JeZuq6g9PPL/Oa76yNieZW6iPxL1Stt2nZaBnxpRuhksIt2nO4JlRuI6ftX4dQR16qcALWf/ctOAsyfLNvSw3Tp+qXsNlLHbG3D4/Hjx1Ug3q1F06G16BJxXA4pe317W6/vEPROyft32jwgMHGFFtC4bC6aWDYR83lBhwWh87JGpV6RnxBU5KhH3c3XC59gjB0g54D6KP6nS0DFJL5PZAtFuQtOPsMmmvVVNOILTIYRvHJbG2JXq932b6/LGK83i/TQ5tCrKV1CgJ1+pa5fH115ddd7Vy88OC6ZKeNEAGv8PAdZ/WxxL66+l8OV1qud4+jx4YMd1nwHGH5Pjw1MX5PgcwaBQda+bbbdbNuWKx9HVoC+Brc//1by/7cHo61YUw7PcvtyUzEZZx9ugwhq4+nypup3EHgZMouqgdAAh2/sN//VQJiJuPhgJFPpllcfT84eOVBTS924xCGEh1LWI+1oQajo6IEpbW28uXyUMAu0yzzaX9LLvUcnn5y5//ettSaff40NQtKYcjfd1Q/3j59fMEsnS9Xi6nE/DDzUXAtqoJyjdaBVlgp69Pz+eGA0smo4empdmuqNioSeieSI1a2+ft/nqbLDoXHcshUcEG9BRsaFsap89/+8vfXnfRnBHupJxHRLs87vdpT4Id3bpJtHr68PFC4nK/3d4eBQ80bc8///xccz/fb48tqIJ/aIezpH3bvbeBHAV8vX/+9HI3ovr4xz9+vPSqHIUYtNT4F5N6PzApJ4oCRbmhEPTLy5eHke356efTtUNaA15QkR/T6vQAg51rms08zRtwGkgZAE/jbBLrm6btqSzNNuY/7o9pA61qLpd60Ga8fXlFNZDQF+7EI9PqZXd9fr60LO2YPc5bOa6gQJiubTssFLYJFR7Thvp6gV3kcX7G+/p6bYjy2zIVibDBGViHWuVKCx70+4kPDIHZPr2fGaDvBpQVJL2/fn59lKataFczP3359e+/vLytub88PVeDttPrp0+vKJFDc700FB3e7fUVYMbbK5zTILO2FZc59jkOI76dj2v17di+nAcq8JJja47GW72f3sDW0K4cSKCU++OE4H3wOCEHBS4nGOUIoFyF0pQjAlxo9dGhG1tOD8vpDzr9vRzEAfdg/n28l0gptuOlWTamnOM4f5yPlQNqC8KCxGVtBfmzKUbdI2Kkg5GPaFxn2F52p58/nvu0PV5vo1VddT7DcaDdtgx70Tz/9OHUwezrXpJPArcgLjuOCOAUYKRG4x8S6B4ULIfNrpzrIUiRD2m5fblviQ+w4rWXfn17+XT3tK3rumOgGHabph1cB0YBl6BYDBYqir3dN3Z5PtedNOP99dPnrVCEjoD6JYAOYs2jP0dSkcu5GRgYwfSY9dBVzx044rJACDRuDhbjPWsvg1Lr27atvGmenpEsyF8rSdJ5yyI11/6Kfvj1yzLzuvu54WRzYeMh6DWa1LFnbDDur/fddqKRQhk0xZzEbXrbt4E/dZVWJgDn/LaD5l8vaLjG8fM86yhsLOW11WIx4CuBloOUASRA5fE2TqIHP9ZCkh6Pjivs2menJWkatSzLngZ0bhRkUO3jbZFtw3OUQ0ej2ZwW6BVyfz6h4I4oYMGiVtWNQqB0Is2oVJeGFxKNsrGKqpFB9y26vC2ievkeHTqcF9fRsE6E3DVyW2PeHrY9E7THcr5ByXkjlXCa2dd7QoPTdzn1dBtDnLa2jqHl4yrsqx1yrinY0zSKMLKTTlW65zSCCpFr4MN9I5NpA7+CVbyJcK9VfJKUT2TZ2xmWuFqYmzwuum1FP9ZvkHRgjaleyZ1u373h+vb+6vv3aO/pl38b+P411m+vrn54M/bt9dh3k75/M6d/eK/27dD19zv8eB773U///OP/89btx+F/mv7Dmvrfr6f/8al/9/N3J8c6/6D0t5eF/3iB+d3ody8of/CE/sFKv837/dr/yfX+tvN/AefOwlo=')) def _save_last_id(last_id): with open(LASTID_PATH, "w") as f: f.write("%s"%last_id) def _load_last_id(): try: with open(LASTID_PATH) as f: return int(f.read()) except (IndexError, ValueError): return 0 def follow_redirects(url): for i in range(10): resp = requests.head(url, allow_redirects=False, timeout=5) if 300<=resp.status_code<400: url = resp.headers["location"] if "/twitter.com" in url: return url.replace("/twitter.com", "/mobile.twitter.com") else: break return url class Tweet(object): """A Tweet, containing the status as coming from tweepy, and and various parsed items. """ def __init__(self, status): if status.retweeted or status.text.startswith("RT"): print("Skipping retweet {}".format(status.id)) raise SkipThis() for stop_string in STOP_STRINGS: if stop_string in raw.text: raise SkipThis() self.status = status self.id = status.id self.screen_name = status.author.screen_name self.user_name = status.author.name if "Trend" in self.user_name: raise SkipThis() self.location = status.author.location self.timestamp = status.created_at self.text = status.text self.urls = [u["expanded_url"] for u in status.entities["urls"]] if status.entities.get("media_url"): self.urls.append(status.entities["media_url"]) elif status.entities.get("media_url_https"): self.urls.append(status.entities["media_url_https"]) def get_user_line(self): return "%s/%s -- %s"%( self.screen_name, self.user_name, self.location) def print_as_text(self): print("(%s)"%self.get_user_line()) print(textwrap.fill(self.text, width=70, initial_indent=" ", subsequent_indent=" ")) class TwitterRequester(object): def __init__(self, consumer_key, consumer_secret): auth = tweepy.OAuthHandler(consumer_key, consumer_secret) self.api = tweepy.API(auth) self._load_blacklist() def retrieve_new(self, searchterm): """returns a list of twitter items that came up since the last call. This uses a file on disk to maintain state. """ last_id = _load_last_id() new_tweets = self.api.search(searchterm, since_id=last_id) max_id = last_id tweets = [] for status in new_tweets: if status.author.name in self.blacklist: print("ignoring from {}".format(status.author.name)) continue try: tweets.append(Tweet(status)) max_id = max(max_id, status.id) except SkipThis: pass _save_last_id(max_id) return tweets def _load_blacklist(self): try: with open(BLACKLIST_PATH) as f: self.blacklist = set(f.read().split("\n")) except IOError: self.blacklist = set() def save_blacklist(self): with open(BLACKLIST_PATH, "w") as f: f.write("\n".join(self.blacklist)) def add_to_blacklist(self, name): self.blacklist.add(name) self.save_blacklist() try: import tkinter class SoftScrollingText(tkinter.Frame): """A widget that lets you add messages that then will scroll in smoothly. """ def __init__(self, master, requester, **kwargs): tkinter.Frame.__init__(self, master, **kwargs) self.master = master self.requester = requester self.items = [] self.item_queue = [] self.adder_id = None self.canvas = tkinter.Canvas(self) self.canvas.config(yscrollincrement=1) self.canvas.grid(row=0, column=0, sticky=tkinter.NE+tkinter.SW) self.scrollbar = tkinter.Scrollbar(self, command=self.canvas.yview) self.canvas.config(yscrollcommand=self.scrollbar.set) self.scrollbar.grid(row=0, column=1, sticky=tkinter.NE+tkinter.SW) self.rowconfigure(0, weight=1) self.columnconfigure(0, weight=1) self.auto_scroll = True self.cur_y = 0 self.max_items = 100 self.feed_delay = 2500 self.bind("q", lambda ev: self.quit()) self.bind("", lambda ev: self._toggle_auto_scroll()) self.focus() def _set_selection(self, content): self.selection = content self.clipboard_clear() self.clipboard_append(content) self.selection_own(selection="PRIMARY") self.selection_handle(lambda *args: self.selection, selection="PRIMARY") def _crop_items(self): if self.auto_scroll and len(self.items)>self.max_items: items_to_delete = self.items[:-self.max_items] self.items = self.items[-self.max_items:] for item, widget in items_to_delete: widget.destroy() self.canvas.delete(item) def _toggle_auto_scroll(self): if self.auto_scroll: self.auto_scroll = False else: self.auto_scroll = True def add_tweet(self, tweet): container = tkinter.Frame(self.canvas) formatted_text = "{}\n{}\n{}".format( tweet.get_user_line(), tweet.text, tweet.id) label = tkinter.Label(container, text=formatted_text, relief=tkinter.SUNKEN, justify=tkinter.LEFT, anchor=tkinter.NW, wraplength=max(200, self.canvas.winfo_width()), font=('helvetica', 10)) label.bind("<1>", lambda ev, id=tweet.id: self._set_selection(id)) label.tweet = tweet label.pack(fill=tkinter.BOTH, expand=True, side=tkinter.TOP) for u in tweet.urls: parsed = urlparse.urlparse(u) but = tkinter.Button(container, text=parsed.hostname, command=lambda u=u: open_url(u)) but.pack(fill=tkinter.BOTH, expand=True, side=tkinter.TOP) item = self.canvas.create_window(0, self.cur_y, window=container, anchor=tkinter.NW) self.update_idletasks() self.cur_y = self.canvas.bbox(item)[-1] self.items.append((item, container)) self.canvas.config(scrollregion=self.canvas.bbox(tkinter.ALL)) canvas_lower = self.canvas.canvasy(self.canvas.winfo_height()) if self.auto_scroll and self.cur_y>canvas_lower: self.scroll_by(self.cur_y-canvas_lower) def _promote_from_queue(self): self.adder_id = None try: if self.item_queue: try: self.add_tweet(self.item_queue.pop()) except tkinter.TclError: # presumably weird chars, just take next pass finally: if self.item_queue: self.adder_id = self.after( self.feed_delay, self._promote_from_queue) def add_many(self, items): self.item_queue.extend(items) if self.item_queue and not self.adder_id: self.adder_id = self._promote_from_queue() self._crop_items() def scroll_by(self, diff): if diff<2: self.canvas.yview(tkinter.SCROLL, int(diff), tkinter.UNITS) del self.scroll_by_callback else: scroll_now = max(1, int(diff/3)) self.canvas.yview(tkinter.SCROLL, scroll_now, tkinter.UNITS) self.scroll_by_callback = self.after(70, lambda: self.scroll_by(diff-scroll_now)) except ImportError: # no tkinter, no problem pass def loop_text(requester, term): while True: print("\n/=========== %s "%datetime.datetime.now().strftime( "%H:%M (%Y-%m-%d)")) tweets = requester.retrieve_new(term) if tweets: play_scratch() for tweet in tweets: tweet.print_as_text() print("") time.sleep(100) def loop_GUI(requester, term, opts): root = tkinter.Tk() sk = SoftScrollingText(root, requester) sk.pack(expand=True, fill=tkinter.BOTH) def update(): try: tweets = requester.retrieve_new(term) if tweets and opts.noisy: play_scratch() if tweets and not opts.level: root.lift() sk.add_many(tweets) finally: root.after(60000, update) root.title("twitupdate") root.geometry("250x768-0+0") update() root.mainloop() def parse_command_line(): import optparse parser = optparse.OptionParser(usage="%prog [options] ") parser.add_option("-b", "--beep", help="make noise when displaying" " new tweets", dest="noisy", action="store_true") parser.add_option("-l", "--level", help="do not raise window when displaying" " new tweets", dest="level", action="store_true") opts, args = parser.parse_args() term = args[0] return opts, term def main(): opts, term = parse_command_line() term = "#"+term requester = TwitterRequester(APP_KEY, APP_SECRET) # loop_text(requester, term) loop_GUI(requester, term, opts) if __name__=="__main__": main()