#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
This script downloads stuff from arte plus 7; without an argument, it
will let you select a clip in a curses interface.
It will download the movie to .flv.
New style means that there's an embedded link to some json defining all
the available streams, and we select one based on our options.
Note as of may 2022, streams will have bad audio if dumped with ffmpeg
earlier than version. To cope with this, grabarte looks in
/usr/local/ffmpeg/ffmpeg; if that's executable, we pass tell streamlink
to use that for muxing.
"""
import contextlib
import curses
import datetime
import glob
import locale
import os
import pprint
import re
import sys
import subprocess
import textwrap
import urllib.request, urllib.parse, urllib.error
import urllib.parse
from curses import textpad
try:
import json
except ImportError:
import simplejson as json
import requests
# set to true for development (caching of json content is independent of this)
CACHE_RESULTS = False
# where we keep the cached json program files
CACHE_DIR = os.path.join(
os.environ.get("XDG_CACHE_HOME",
os.path.expanduser("~/.cache")),
"grabarte")
# after how many days to remove cached program files
DISCARD_AFTER = 21
# where to pull Arte's JSON program files from
ARTE_API_URL = "https://www.arte.tv/api/rproxy/emac/v3/de/web/pages/TV_GUIDE"
# where to get JSON including an API URL
# (this doesn't look terribly stable -- is there a way to find this URL?)
ARTE_CONFIG_URL = ("https://static-cdn.arte.tv/static/"
"artevp/5.1.3/config/json/general.json")
# Change to whatever you like
USER_AGENT = "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36"
class SkipThis(Exception):
pass
class StatusDisplay:
"""A context manager for updating a one-line display.
"""
def __init__(self, dest_f=sys.stdout):
self.dest_f = dest_f
self.clearer = "\r\n"
def update(self, new_content):
self.dest_f.write(self.clearer+new_content)
self.dest_f.flush()
self.clearer = "\r"+(" "*len(new_content))+"\r"
def __enter__(self):
self.dest_f.write(self.clearer)
self.dest_f.flush()
return self
def __exit__(self, *args):
self.dest_f.write("\r\n")
self.dest_f.flush()
def get_with_cache(url, bypass_cache=False, **moreheaders):
cache_name = re.sub("[^\w]+", "", url)+".cache"
if not bypass_cache and CACHE_RESULTS and os.path.exists(cache_name):
doc = open(cache_name, "rb").read()
else:
headers = {"User-Agent": USER_AGENT}
headers.update(moreheaders)
req = requests.get(url, headers=headers)
doc = req.content
if CACHE_RESULTS:
f = open(cache_name, "wb")
f.write(doc)
f.close()
return doc
def label_to_langcode(label):
"""returns a short language code for a version[label] value on ARTE.
"""
normalised = re.sub("[^a-z]+", "-", label.lower())
return normalised.replace("deutsch", "de").replace("franzsisch", "fr")
class ArteItem(object):
"""a wrapper class for a downloadable Arte file.
"""
def __init__(self, title, desc, duration, player_url, broadcast_date):
self.title, self.desc = title, desc
self.duration, self.player_url = duration, player_url
if isinstance(broadcast_date, str):
self.broadcast_date = datetime.datetime.strptime(
broadcast_date,
"%Y-%m-%dT%H:%M:%SZ")
else:
self.broadcast_date = broadcast_date
@classmethod
def from_program_json(cls, item_dict):
"""returns an ArteItem built from ca. 2018-style descriptors
from their JSON feed.
"""
if item_dict["kind"]["code"]=="CLIP":
raise SkipThis()
# unfortnately, availability isn't properly set.
#if not item_dict["availability"]["hasVideoStreams"]:
# raise SkipThis()
if not item_dict["duration"]:
raise SkipThis()
res = cls(
title=item_dict["title"],
desc=item_dict.get("fullDescription")
or item_dict.get("shortDescription") or "No description",
duration=item_dict["duration"],
player_url=item_dict["url"],
broadcast_date=item_dict.get("broadcastDates", [0])[-1])
res.item_dict = item_dict
return res
class ArteProgram(object):
"""a facade for Arte's program pages.
This includes requesting the json files, caching and discarding them
locally, and turning them into an ordered stream of ArteItems.
The caching will probably be... suboptimal... in time zones far away
from UTC. I suppose we should be forcing the Strasbourg time zone.
Pass expire=False to skip discarding of cached program items we
deem stale.
"""
def __init__(self,
expire=True,
cache_dir=CACHE_DIR,
discard_after=DISCARD_AFTER,
api_url=ARTE_API_URL):
self.cache_dir = cache_dir
if not os.path.isdir(self.cache_dir):
os.makedirs(self.cache_dir)
self.discard_after = discard_after
self.api_url = api_url
if expire:
self._expire()
def _parse_name(self, file_name):
"""returns a datetime.date for one of our cache file names.
This crashes for invalid file names on purpose; people should clean
up their cache dirs if there's junk in there.
"""
mat = re.search(r"/(\d\d\d\d)-(\d\d)-(\d\d).arte-json", file_name)
return datetime.date(
int(mat.group(1)),
int(mat.group(2)),
int(mat.group(3)))
def _get_cache_name(self, date):
"""returns a full path to write the cache file for date to.
"""
return os.path.join(self.cache_dir, date.isoformat())+".arte-json"
def _expire(self):
"""removes stale files from the cache.
Stale are either files older than self.discard_after, or files retried
on the they they're for.
"""
today = datetime.datetime.utcnow().date()
for file_name in glob.glob(os.path.join(self.cache_dir, "*.arte-json")):
for_date = self._parse_name(file_name)
retrieved_on = datetime.datetime.utcfromtimestamp(
os.path.getmtime(file_name)).date
if for_date==retrieved_on:
os.unlink(file_name)
if (today-for_date).days>self.discard_after:
os.unlink(file_name)
def _retrieve_for(self, date):
"""returns the json program content for a datetime.date.
This will be the cached content if it's available, else we
go to arte, pull it from there, and cache it.
"""
cache_name = self._get_cache_name(date)
if os.path.isfile(cache_name):
with open(cache_name, "rb") as f:
return f.read()
req = requests.get(self.api_url, {"day": date.isoformat()}, timeout=3)
req.raise_for_status()
with open(cache_name, "wb") as f:
f.write(req.content)
return req.content
def get_items_for(self, date):
"""returns the arte program items for date.
"""
try:
raw_data = json.loads(self._retrieve_for(date))
except ValueError:
import traceback; traceback.print_exc()
sys.stderr.write("No data for %s\n"%date)
return
guide = raw_data.get("value", raw_data)["zones"][1]
if guide["displayOptions"]["itemTemplate"]!="guide":
sys.stderr.write("No guide? %s\n"%repr(guide["displayOptions"]))
for item in guide["data"]:
try:
yield ArteItem.from_program_json(item)
except SkipThis:
pass
def get_items_since(self, days):
"""returns program items for the last days days.
This will retrieve program json for this many days if necessary.
"""
today = datetime.datetime.utcnow().date()
date = datetime.datetime.utcnow().date()
items = []
while (today-date).daysself.height-2:
self.cur_top += self.height-1
self._redraw_internal()
def move_to(self, index):
with self._change_line():
self.cur_item = min(index, len(self.items)-1)
self.adjust_window()
def page_down(self):
with self._change_line():
self.cur_item = min(len(self.items)-1, self.cur_item+self.height-1)
self.adjust_window()
def page_up(self):
with self._change_line():
self.cur_item = max(0, self.cur_item-self.height+1)
self.adjust_window()
def go_down(self):
with self._change_line():
if self.cur_item0:
self.cur_item -= 1
self.adjust_window()
def adjust_for_string(self, phrase, start_at):
phrase = phrase.strip()
for i, title in enumerate(self.items[start_at:]):
if phrase in title.lower():
with self._change_line():
self.cur_item = start_at+i
self.adjust_window()
self.window.refresh()
break
else:
curses.flash()
class UpdatingSearch(textpad.Textbox):
"""The mainloop while searching.
"""
def __init__(self, menu, win):
self.menu = menu
self.last_search = ""
self.start_item = self.menu.cur_item
textpad.Textbox.__init__(self, win)
self.stripspaces = False
def do_command(self, ch):
try:
if ch==curses.ascii.ESC:
self.last_search = ""
self.menu.adjust_for_string("", self.start_item)
return 0
return textpad.Textbox.do_command(self, ch)
finally:
_, pos = self.win.getyx()
self.last_search = self.win.instr(0, 0, pos).decode("utf-8")
self.menu.adjust_for_string(self.last_search.lower(), self.start_item)
self.win.move(0,0)
self.win.addstr(self.last_search)
class CursesUI(CursesMixin):
# use as a context manager to have initscr called
def __init__(self, items):
self.items = items
self.window = None
self.last_search = None
def _process_q(self):
self.go_on = False
def _process_g(self):
self.go_on = False
self.selection = self.menu.cur_item
def _process_0(self):
self.menu.move_to(0)
def _process_n(self):
if self.last_search is not None:
self.menu.adjust_for_string(
self.last_search.lower(),
self.menu.cur_item+1)
def _process_KEY_UP(self):
self.menu.go_up()
_process_k = _process_KEY_UP
def _process_KEY_DOWN(self):
self.menu.go_down()
_process_j = _process_KEY_DOWN
def _process_KEY_PPAGE(self):
self.menu.page_up()
_process_CTRLF = _process_KEY_PPAGE
def _process_KEY_NPAGE(self):
self.menu.page_down()
_process_CTRLB = _process_KEY_NPAGE
def _process_KEY_ENTER(self):
win_width = 40
text = self.items[self.menu.cur_item].desc or "(no description)"
description = textwrap.wrap(
text, win_width-2)[:self.height-2]
subwin = self.window.subwin(len(description)+2, win_width,
self.height//2-len(description)//2, self.width//2-20)
subwin.erase()
for ln, content in enumerate(description):
subwin.addstr(ln+1, 1, content.encode(self.coding, "ignore"))
subwin.box()
subwin.getkey()
self.menu.redraw()
def _process_KEY_RESIZE(self):
self._on_window_resized()
self.menu._on_window_resized()
self.menu.redraw()
def _process_KEY_SLASH(self):
subwin = self.window.subpad(1, 80, self.height-1, 0)
pad = UpdatingSearch(self.menu, subwin)
curses.curs_set(1)
try:
pad.edit()
self.last_search = pad.last_search
finally:
curses.curs_set(0)
subwin.clear()
def _main_loop(self):
while self.go_on:
try:
key = self.window.getkey().replace("^", "CTRL"
).replace("\n", "KEY_ENTER"
).replace("/", "KEY_SLASH")
except curses.error: # buggy curses, try again
pass
# self._paint(70, 0, str(key), curses.A_STANDOUT)
getattr(self, "_process_"+str(key), lambda: None)()
def choose(self):
assert self.window is not None
self.menu = CursesMenu(self.window, [
"%s (%s min)"%(i.title, i.duration//60) for i in self.items])
self.menu.redraw()
self._main_loop()
if self.selection is not None:
return self.items[self.selection]
class Stream:
"""A class encapsulating a stream.
This is abstract; you need to implement a constructor and
a retrieve(dest_name) method.
"""
class HTTPStream(Stream):
"""A stream consisting of a single HTTP URI.
"""
def __init__(self, uri):
self.uri = uri
def retrieve(self, dest_name):
print("\nDumping %s to %s\n"%(self.url, dest_name))
# ok, so we perhaps should store protocol from metadata. Or look
# at the HTTP content-type. But for now the URI form is good enough.
if self.url.endswith("m3u8"):
retrieve_m3u8_stream(url, dest_name)
else:
dump_cmd = ["curl",
'-o', dest_name, self.url]
for retry in range(2):
sub_proc = subprocess.Popen(dump_cmd)
sub_proc.wait()
if sub_proc.returncode==0:
break
print("dowloader failed, trying again.")
class M3UStream(Stream):
def __init__(self, video_uri, audio_uri=None):
self.video_uri, self.audio_uri = video_uri, audio_uri
def _retrieve_m3u(self, m3u_url, dest_name):
lines = get_with_cache(m3u_url).decode("utf-8").split("\n")
n_bytes = 0
# in Dec 2020, arte at least for some shows used byte ranges
# as segments. We ignore those and just fetch the whole
# file -- but that only once.
urls_retrieved = set()
with StatusDisplay() as disp:
with open(dest_name, "wb") as f:
for index, ln in enumerate(lines):
if not ln.strip() or ln.startswith("#"):
continue
if not ln.startswith("http"):
ln = urllib.parse.urljoin(m3u_url, ln)
if ln in urls_retrieved:
continue
src = requests.get(ln, stream=True)
while True:
content = src.raw.read(2**20)
if content==b"":
break
f.write(content)
n_bytes += len(content)
disp.update("{}/{}: {}k".format(index+1, len(lines), n_bytes//1014))
urls_retrieved.add(ln)
def retrieve(self, dest_name):
"""writes all parts mentioned in url to dest_name.
Again, we don't properly parse the m38 for now. We may want to if
and when arte puts in ads.
"""
if self.audio_uri:
# we'll need to mux
audio_dest = "audio{}.mp4".format(os.getpid())
video_dest = "video{}.mp4".format(os.getpid())
try:
print("Getting audio...")
self._retrieve_m3u(self.audio_uri, audio_dest)
print("Getting video...")
self._retrieve_m3u(self.video_uri, video_dest)
subprocess.check_call(["ffmpeg",
"-i", video_dest, "-i", audio_dest,
"-codec", "copy", dest_name])
finally:
if os.path.exists(audio_dest):
os.unlink(audio_dest)
if os.path.exists(video_dest):
os.unlink(video_dest)
pass
else:
self._retrieve_m3u(self.video_uri, dest_name)
def add_streams_from_m3u(streams, m3u_url, lang):
"""Adds (lang, qual) -> stream-uri items to stream from an m3u url.
We don't use a formal m3u parser here; let's see how far we
get before giving in to some extra dependency.
"""
quality = None
meta_re = re.compile("RESOLUTION=([^,]+)")
audio_re = re.compile('TYPE=AUDIO,.*URI="([^"]*)"')
audio_uri = None
videos = []
for ln in get_with_cache(m3u_url).decode("utf-8").split("\n"):
if ln.startswith("#"):
mat = meta_re.search(ln)
if mat:
quality = mat.group(1)
else:
quality = None
mat = audio_re.search(ln)
if mat:
audio_uri = urllib.parse.urljoin(m3u_url, mat.group(1))
elif ln.startswith("http"):
if quality is not None:
videos.append((quality, lang, ln.strip()))
elif "/" in ln:
# relative links
if quality is not None:
videos.append((quality, lang, urllib.parse.urljoin(m3u_url, ln.strip())))
for quality, lang, video_uri in videos:
streams[quality, lang] = M3UStream(video_uri, audio_uri)
def get_items_from_arte(filter_chars, expire, days):
"""returns items currently available at arte for the last days.
filter_chars, if not None, are characters that will be required in
the titles or descriptions.
expire is handed through to ArteProgram.
"""
if filter_chars:
filter_func = lambda item, chars=filter_chars.lower(): (
(item.desc and chars in item.desc.lower())
or chars in item.title.lower())
else:
filter_func = lambda item: True
program = ArteProgram(expire=expire)
return [item for item in program.get_items_since(days)
if filter_func(item)]
def choose_program(args):
"""opens a UI to choose an item from arte+7's RSS feed and returns
the link to the video page.
args is whatever is returned by parse_command_line.
"""
items = get_items_from_arte(
filter_chars=args.filter_chars,
expire=not args.keep_cache,
days=args.days)
with CursesUI(items) as ui:
res = ui.choose()
if res:
return res.player_url
# arte sometimes gives labels these days, sometimes resolutions;
# list both here.
PREFERRED_BITRATES_HIGH = [
"720p", "1280x720",
"406p", "720x406"
"406p low", "640x360",
"360p",
"216p", "384x216",
]
PREFERRED_BITRATES_MED = [
"360p",
"406p", "720x406"
"406p low", "640x360",
"216p", "384x216",
"720p", "1280x720",
]
PREFERRED_BITRATES_LOW = list(reversed(PREFERRED_BITRATES_HIGH))
PREFERRED_BITRATES = PREFERRED_BITRATES_MED
def choose_stream(media_URLs, lang):
"""returns a preferred bitrate from media_URLs.
Media_URLs maps a quality label to download URLs.
This uses PREFERRED_FORMATS.
"""
available_langs = set(k[1] for k in media_URLs)
matching_langs = set(l for l in available_langs
if lang in l.lower())
if not matching_langs:
raise Exception("No media for language. Available: %s"%available_langs)
selected_lang = list(sorted(matching_langs,
key=lambda v: len(v)))[0]
for bitrate in PREFERRED_BITRATES:
if (bitrate, selected_lang) in media_URLs:
return media_URLs[bitrate, selected_lang]
else:
raise Exception("No known bitrates/language among offered %s/%s"
"(amend PREFERRED_BITRATES?)"%(available_langs, list(media_URLs.keys())))
def get_stream_parameters(embedding_URL):
"""guesses dest_name, download_URL from one of arte+7's video pages.
"""
print(embedding_URL)
dest_name = os.path.splitext(embedding_URL.split("/")[-1].split('?'
)[0])[0]+".flv"
if dest_name==".flv":
dest_name = embedding_URL.split("/")[-2]+".flv"
# step 0: obtain an API token. Sigh.
auth_json = get_with_cache(ARTE_CONFIG_URL).decode("utf-8")
try:
token = json.loads(auth_json)["apiplayer"]["token"]
except ValueError:
raise Error("No token visible in %s"%auth_json)
mh = {"Authorization": "Bearer "+token}
# step 1: obtain JSON URL from embedding HTML
# since 2019-07, this is is buried in some javascript emitted
# from some PHP (no kidding), no more beautiful soup. I'll be a gardener.
# If it's gone again: look for api.arte.tv here.
tx = get_with_cache(embedding_URL, **mh).decode("utf-8")
mat = re.search(r'[?&]json_url=(.*?)\"', tx)
if mat:
internal_API_URI = urllib.parse.unquote(mat.group(1))
video_id = internal_API_URI.split('?')[0].split("/")[-1]
else:
# ok, next attempt: sometimes there's a humungous JSON literal
# assigned to window.__INITIAL_STATE__. Somewhere deep within
# it there's the video id.
mat = re.search(
"(?s)window.__INITIAL_STATE__ = (.*?)", tx)
if mat:
video_meta = json.loads(mat.group(1).strip().rstrip(";"))
video_id = video_meta["pages"]["currentCode"]
# it seems the last two _ separated fragments is language and quality,
# which we do in the m3u
video_id = "_".join(video_id.split("_")[:-2])
else:
sys.exit("No json_url in iframe src?")
# step 2: obtain and parse the json; the api-interal URI that is
# json_URL apparently doesn't work with the auth we've extracted
# above. api.arte apparently does.
api_URI = "https://api.arte.tv/api/player/v2/config/de/"+video_id
video_meta = json.loads(get_with_cache(api_URI, **mh).decode("utf-8"))
streams, raw_streams = {}, video_meta["data"]["attributes"]["streams"]
pprint.pprint(raw_streams)
for s in raw_streams:
if s["protocol"].startswith("HTTP"):
lang = label_to_langcode(s["versions"][0]["label"])
qual = s["mainQuality"]["label"]
if (qual, lang) not in streams:
streams[qual, lang] = s["url"]
elif s["protocol"] in ("HLS", "HLS_NG"):
add_streams_from_m3u(streams,
s["url"], label_to_langcode(s["versions"][0]["label"]))
else:
# unknown format; ignore for now.
continue
return dest_name, streams
def retrieve_local(args, arteurl):
"""pulls the media stream at arteurl using some local hacks.
This is broken as of Dec 2021, but we may want to revive it in case
streamlink lags.
"""
dest_name, media_URLs = get_stream_parameters(arteurl)
stream = choose_stream(media_URLs, args.lang)
if args.output_name is not None:
dest_name = args.output_name
stream.retrieve(dest_name)
def retrieve_streamlink(args, arteurl):
import streamlink
print(arteurl)
options = {}
fallback_path = "/usr/local/ffmpeg/ffmpeg"
if os.path.exists(fallback_path):
options["ffmpeg-ffmpeg"] = fallback_path
session = streamlink.Streamlink(options or None)
streams = session.streams(arteurl)
for format in PREFERRED_BITRATES:
if format in streams:
stream = streams[format]
break
else:
raise Exception("Amend PREFERRED_BITRATES to include one of"
f" {streams.keys()}")
dest_name = args.output_name
if dest_name is None:
dest_name = os.path.splitext(arteurl.split("/")[-1].split('?'
)[0])[0]+".mp4"
if dest_name==".mp4":
dest_name = arteurl.split("/")[-2]+".mp4"
total_read = 0
with open(dest_name, "wb") as dest:
src = stream.open()
with StatusDisplay() as disp:
while True:
buf = src.read(2**20)
if buf:
dest.write(buf)
total_read += len(buf)
disp.update("{}k".format(total_read//1024))
else:
break
src.close()
def parse_cmd_line():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--sd", dest="low_quality", action="store_true",
help="get video in low quality")
parser.add_argument("-x", "--hd", dest="high_quality", action="store_true",
help="get video in high quality")
parser.add_argument("-l", "--lang", dest="lang", action="store",
help="Language code (de or fr)", default="de")
parser.add_argument("-o", "--output", dest="output_name", action="store",
help="Override output name", default=None)
parser.add_argument("-f", "--filter", dest="filter_chars", action="store",
help="Characters that must be in title or description"
" for a medium to get into the selection")
parser.add_argument("-d", "--days", dest="days", action="store",
type=int, default=8,
help="Check for programs that many days in the past.")
parser.add_argument("-K", "--keep-cache", dest="keep_cache",
action="store_true",
help="Keep cached files even when we think they're stale (use this"
" if you run grabarte several times in close succession).")
parser.add_argument("arteurl", type=str, help="URL of the arte+7"
" flash embedding URL", nargs="?", default=None)
return parser.parse_args()
def main():
global PREFERRED_BITRATES
locale.setlocale(locale.LC_ALL, '')
try:
args = parse_cmd_line()
except ImportError:
# no argparse, assume defaults for small machine
class args:
arteurl = None
low_quality = True
lang = "de"
output_name = None
if args.low_quality:
PREFERRED_BITRATES = PREFERRED_BITRATES_LOW
elif args.high_quality:
PREFERRED_BITRATES = PREFERRED_BITRATES_HIGH
arteurl = args.arteurl
if arteurl is None:
arteurl = choose_program(args)
if arteurl and not arteurl.startswith("https:"):
arteurl = "https://www.arte.tv"+arteurl
if arteurl is None:
return
#retrieve_local(args, arteurl)
retrieve_streamlink(args, arteurl)
if __name__=="__main__":
main()