Merge pull request #18 from Emrvb/master

Refactored library and radionet class
This commit is contained in:
Mariusz
2023-03-06 18:14:53 +01:00
committed by GitHub
11 changed files with 902 additions and 296 deletions

View File

@@ -37,6 +37,7 @@ Mopidy-RadioNet to your Mopidy configuration file::
enabled = true enabled = true
language = pl # or net, de, at, fr, pt, es, dk, se, it language = pl # or net, de, at, fr, pt, es, dk, se, it
min_bitrate = 96 min_bitrate = 96
api_key = valid_api_key
favorite_stations = favorite_stations =
'bbcradio1' 'bbcradio1'
'bbcradio2' 'bbcradio2'
@@ -77,8 +78,8 @@ Mopidy-RadioNet to your Mopidy configuration file::
Project resources Project resources
================= =================
- `Source code <https://github.com/blackberrymamba/mopidy-radionet>`_ - `Source code <https://github.com/plintx/mopidy-radionet>`_
- `Issue tracker <https://github.com/blackberrymamba/mopidy-radionet/issues>`_ - `Issue tracker <https://github.com/plintx/mopidy-radionet/issues>`_
Changelog Changelog

View File

@@ -5,8 +5,7 @@ import os
from mopidy import config, ext from mopidy import config, ext
__version__ = "0.2.2"
__version__ = '0.2.2'
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -14,21 +13,23 @@ logger = logging.getLogger(__name__)
class Extension(ext.Extension): class Extension(ext.Extension):
dist_name = 'Mopidy-RadioNet' dist_name = "Mopidy-RadioNet"
ext_name = 'radionet' ext_name = "radionet"
version = __version__ version = __version__
def get_default_config(self): def get_default_config(self):
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf') conf_file = os.path.join(os.path.dirname(__file__), "ext.conf")
return config.read(conf_file) return config.read(conf_file)
def get_config_schema(self): def get_config_schema(self):
schema = super(Extension, self).get_config_schema() schema = super(Extension, self).get_config_schema()
schema['language'] = config.String() schema["language"] = config.String()
schema['min_bitrate'] = config.String() schema["min_bitrate"] = config.String()
schema['favorite_stations'] = config.List() schema["api_key"] = config.String()
schema["favorite_stations"] = config.List(True)
return schema return schema
def setup(self, registry): def setup(self, registry):
from .backend import RadioNetBackend from .backend import RadioNetBackend
registry.add('backend', RadioNetBackend)
registry.add("backend", RadioNetBackend)

View File

@@ -1,10 +1,8 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import time import re
from mopidy import backend
import pykka import pykka
from mopidy import backend
import mopidy_radionet import mopidy_radionet
@@ -18,31 +16,33 @@ class RadioNetBackend(pykka.ThreadingActor, backend.Backend):
def __init__(self, config, audio): def __init__(self, config, audio):
super(RadioNetBackend, self).__init__() super(RadioNetBackend, self).__init__()
self.radionet = RadioNetClient( self.radionet = RadioNetClient(
config['proxy'], config["proxy"],
"%s/%s" % ( "%s/%s"
mopidy_radionet.Extension.dist_name, % (mopidy_radionet.Extension.dist_name, mopidy_radionet.__version__),
mopidy_radionet.__version__)) )
self.library = RadioNetLibraryProvider(backend=self) self.library = RadioNetLibraryProvider(backend=self)
self.playback = RadioNetPlaybackProvider(audio=audio, backend=self)
self.uri_schemes = ['radionet'] self.uri_schemes = ["radionet"]
self.radionet.min_bitrate = int(config['radionet']['min_bitrate']) self.radionet.min_bitrate = int(config["radionet"]["min_bitrate"])
self.radionet.set_lang(str(config['radionet']['language'])) self.radionet.set_lang(str(config["radionet"]["language"]).strip())
self.radionet.set_favorites(tuple(file_ext.lower() for file_ext in config["radionet"]["favorite_stations"])) self.radionet.set_apikey(str(config["radionet"]["api_key"]))
self.radionet.set_favorites(
tuple(
file_ext.strip("'").lower() for file_ext in config["radionet"]["favorite_stations"]
)
)
def set_update_timeout(self, minutes=2):
self.update_timeout = time.time() + 60 * minutes
def on_start(self): class RadioNetPlaybackProvider(backend.PlaybackProvider):
self.set_update_timeout(0) def is_live(self, uri):
return True
def refresh(self, force=False): def translate_uri(self, uri):
if self.update_timeout is None: identifier = re.findall(r"^radionet:track:?([a-z0-9]+|\d+)?$", uri)
self.set_update_timeout() if identifier:
return self.backend.radionet.get_stream_url(identifier[0])
if force or time.time() > self.update_timeout: return None
self.radionet.get_top_stations()
self.radionet.get_local_stations()
self.radionet.get_favorites()
self.set_update_timeout()

View File

@@ -2,4 +2,5 @@
enabled = true enabled = true
language = pl language = pl
min_bitrate = 96 min_bitrate = 96
api_key = something
favorite_stations = favorite_stations =

View File

@@ -4,35 +4,51 @@ import logging
import re import re
from mopidy import backend from mopidy import backend
from mopidy.models import Album, Artist, Ref, SearchResult, Track from mopidy.models import Album, Artist, Ref, SearchResult, Track, Image
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class RadioNetLibraryProvider(backend.LibraryProvider): class RadioNetLibraryProvider(backend.LibraryProvider):
root_directory = Ref.directory(uri='radionet:root', name='Radio.net') root_directory = Ref.directory(uri="radionet:root", name="Radio.net")
def __init__(self, backend): def __init__(self, backend):
super().__init__(backend) super().__init__(backend)
def lookup(self, uri): def lookup(self, uri):
if not uri.startswith('radionet:'): if not uri.startswith("radionet:"):
return None return None
variant, identifier = self.parse_uri(uri) variant, identifier, sorting, page = self.parse_uri(uri)
if variant == 'station': if variant == "station" or variant == "track":
identifier = int(identifier) try:
radio_data = self.backend.radionet.get_station_by_id(identifier) identifier = int(identifier)
radio_data = self.backend.radionet.get_station_by_id(identifier)
except ValueError:
radio_data = self.backend.radionet.get_station_by_slug(identifier)
artist = Artist(name=radio_data.name) artist = Artist(name=radio_data.name)
name = ""
if radio_data.description is not None:
name = radio_data.description + " / "
name = (
name
+ radio_data.continent
+ " / "
+ radio_data.country
+ " - "
+ radio_data.city
)
album = Album( album = Album(
artists=[artist], artists=[artist],
name=radio_data.description + ' / ' + radio_data.continent + name=name,
' / ' + radio_data.country + ' - ' + radio_data.city, uri="radionet:station:%s" % radio_data.id,
uri='radionet:station:%s' % (identifier)) )
track = Track( track = Track(
artists=[artist], artists=[artist],
@@ -40,81 +56,239 @@ class RadioNetLibraryProvider(backend.LibraryProvider):
name=radio_data.name, name=radio_data.name,
genre=radio_data.genres, genre=radio_data.genres,
comment=radio_data.description, comment=radio_data.description,
uri=radio_data.stream_url) uri="radionet:track:%s" % radio_data.id,
)
return [track] return [track]
return [] return []
def browse(self, uri): def browse(self, uri):
self.backend.refresh()
directories = [] category, page, value, sorting = self.parse_uri(uri)
tracks = []
variant, identifier = self.parse_uri(uri) if category == "root":
if variant == 'root': return self._browse_root()
if self.backend.radionet.local_stations: elif category in ["favorites", "topstations", "localstations"]:
directories.append( return self._browse_category(category, page)
self.ref_directory( elif category in ["genres", "topics", "languages", "cities", "countries"]:
"radionet:category:localstations", "Local stations") return self._browse_sorted_category(category, value, sorting, page)
)
if self.backend.radionet.top_stations:
directories.append(
self.ref_directory(
"radionet:category:top100", "Top 100")
)
if self.backend.radionet.favorite_stations:
directories.append(
self.ref_directory(
"radionet:category:favorites", "Favorites")
)
return directories
elif variant == 'category' and identifier:
if identifier == "localstations":
for station in self.backend.radionet.local_stations:
tracks.append(self.station_to_ref(station))
if identifier == "top100":
for station in self.backend.radionet.top_stations:
tracks.append(self.station_to_ref(station))
if identifier == "favorites":
for station in self.backend.radionet.favorite_stations:
tracks.append(self.station_to_ref(station))
tracks.sort(key=lambda ref: ref.name)
return tracks
else: else:
logger.debug('Unknown URI: %s', uri) logger.debug("Unknown URI: %s", uri)
return [] return []
def get_images(self, uris):
images = {}
for uri in uris:
variant, identifier, sorting, page = self.parse_uri(uri)
station = self.backend.radionet.get_station_by_id(identifier)
if station:
images[uri] = []
if station.image_tiny:
images[uri].append(Image(uri=station.image_tiny, height=44, width=44))
if station.image_small:
images[uri].append(Image(uri=station.image_small, height=100, width=100))
if station.image_medium:
images[uri].append(Image(uri=station.image_medium, height=175, width=175))
if station.image_large:
images[uri].append(Image(uri=station.image_large, height=300, width=300))
return images
def _browse_root(self):
directories = [
self.ref_directory("radionet:topstations", "Top stations"),
self.ref_directory("radionet:localstations", "Local stations"),
self.ref_directory("radionet:genres", "Genres"),
self.ref_directory("radionet:topics", "Topics"),
self.ref_directory("radionet:languages", "Languages"),
self.ref_directory("radionet:cities", "Cities"),
self.ref_directory("radionet:countries", "Countries"),
]
if len(self.backend.radionet.favorites) > 0:
directories.insert(0, self.ref_directory("radionet:favorites", "Favorites"))
return directories
def _browse_category(self, category, page):
result = []
if category == "favorites":
items = self._get_favorites()
if items:
for item in items:
result.append(self.station_to_ref(item))
elif category == "topstations":
items = self._get_topstations()
if items:
for item in items:
result.append(self.station_to_ref(item))
elif not page:
pages = self._get_category_pages(category)
if pages == 1:
items = self._get_category(category, 1)
if items:
for item in items:
result.append(self.station_to_ref(item))
else:
for index in range(pages):
result.append(
self.ref_directory(
"radionet:{0}:{1}".format(category, str(index + 1)),
str(index + 1),
)
)
else:
items = self._get_category(category, page)
if items:
for item in items:
result.append(self.station_to_ref(item))
return result
def _browse_sorted_category(self, category, value, sorting, page):
result = []
if not value:
items = self.__getattribute__("_get_{0}".format(category))()
if items:
for item in items:
result.append(
self.ref_directory(
"radionet:{0}:{1}".format(category, item["systemEnglish"]),
item["localized"],
)
)
elif not sorting or sorting not in ["rank", "az"]:
result.append(
self.ref_directory(
"radionet:{0}:{1}:rank".format(category, value), "By rank"
)
)
result.append(
self.ref_directory(
"radionet:{0}:{1}:az".format(category, value), "Alphabetical"
)
)
elif not page:
pages = self._get_sorted_category_pages(category, value)
if pages == 1:
items = self._get_sorted_category(category, value, sorting, 1)
if items:
for item in items:
result.append(self.station_to_ref(item))
else:
for index in range(pages):
result.append(
self.ref_directory(
"radionet:{0}:{1}:{2}:{3}".format(
category, value, sorting, str(index + 1)
),
str(index + 1),
)
)
else:
items = self._get_sorted_category(category, value, sorting, page)
if items:
for item in items:
result.append(self.station_to_ref(item))
return result
def _get_genres(self):
return self.backend.radionet.get_genres()
def _get_topics(self):
return self.backend.radionet.get_topics()
def _get_languages(self):
return self.backend.radionet.get_languages()
def _get_cities(self):
return self.backend.radionet.get_cities()
def _get_countries(self):
return self.backend.radionet.get_countries()
def _get_topstations(self):
return self.backend.radionet.get_category("topstations", 1)
def _get_sorted_category(self, category, name, sorting, page):
return self.backend.radionet.get_sorted_category(category, name, sorting, page)
def _get_sorted_category_pages(self, category, name):
return self.backend.radionet.get_sorted_category_pages(category, name)
def _get_category(self, category, page):
return self.backend.radionet.get_category(category, page)
def _get_category_pages(self, category):
return self.backend.radionet.get_category_pages(category)
def _get_favorites(self):
return self.backend.radionet.get_favorites()
def search(self, query=None, uris=None, exact=False): def search(self, query=None, uris=None, exact=False):
if 'any' not in query: if "any" not in query:
return None return None
result = [] result = []
self.backend.radionet.do_search(' '.join(query['any'])) for station in self.backend.radionet.do_search(" ".join(query["any"])):
for station in self.backend.radionet.search_results:
result.append(self.station_to_track(station)) result.append(self.station_to_track(station))
return SearchResult( return SearchResult(tracks=result)
tracks=result
)
def station_to_ref(self, station): def station_to_ref(self, station):
return Ref.track( return Ref.track(
uri='radionet:station:%s' % (station.id), uri="radionet:station:%s" % station.id,
name=station.name, name=station.name,
) )
def station_to_track(self, station): def station_to_track(self, station):
ref = self.station_to_ref(station) ref = self.station_to_ref(station)
return Track(uri=ref.uri, name=ref.name, album=Album(uri=ref.uri, name=ref.name), return Track(
artists=[Artist(uri=ref.uri, name=ref.name)]) uri=ref.uri,
name=ref.name,
album=Album(uri=ref.uri, name=ref.name),
artists=[Artist(uri=ref.uri, name=ref.name)],
)
def ref_directory(self, uri, name): def ref_directory(self, uri, name):
return Ref.directory(uri=uri, name=name) return Ref.directory(uri=uri, name=name)
def ref_track(self, uri, name):
return Ref.track(uri=uri, name=name)
def parse_uri(self, uri): def parse_uri(self, uri):
result = re.findall(r'^radionet:([a-z]+):?([a-z0-9]+|\d+)?$', uri) category = None
value = None
page = None
sorting = None
result = re.findall(
r"^radionet:(genres|topics|languages|cities|countries)(:([^:]+)(:(rank|az)(:([0-9]+))?)?)?$",
uri,
)
if result: if result:
return result[0] category = result[0][0]
return None, None value = result[0][2]
sorting = result[0][4]
page = result[0][6]
else:
result = re.findall(
r"^radionet:(root|favorites|topstations|localstations|station|track)(:([0-9]+))?$",
uri,
)
if result:
category = result[0][0]
page = result[0][2]
else:
result = re.findall(
r"^radionet:(track):([^:]+)$",
uri,
)
if result:
category = result[0][0]
page = result[0][1]
return category, page, value, sorting

View File

@@ -3,12 +3,10 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import logging import logging
import re
import time import time
from mopidy import httpclient
import requests import requests
from mopidy import httpclient
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -21,256 +19,435 @@ class Station(object):
genres = None genres = None
name = None name = None
stream_url = None stream_url = None
image = None image_tiny = None
image_small = None
image_medium = None
image_large = None
description = None description = None
playable = False playable = False
class RadioNetClient(object): class RadioNetClient(object):
base_url = 'https://radio.net/' base_url = "https://radio.net/"
session = requests.Session() session = requests.Session()
api_key = None
api_prefix = None api_prefix = None
min_bitrate = 96 min_bitrate = 96
max_top_stations = 100 max_top_stations = 100
station_bookmarks = None station_bookmarks = None
api_key = None
stations_images = [] stations_images = []
top_stations = [] favorites = []
local_stations = []
search_results = [] cache = {}
stations_by_id = {}
stations_by_slug = {}
category_param_map = {
"genres": "genre",
"topics": "topic",
"languages": "language",
"cities": "city",
"countries": "country",
}
def __init__(self, proxy_config=None, user_agent=None): def __init__(self, proxy_config=None, user_agent=None):
super(RadioNetClient, self).__init__() super(RadioNetClient, self).__init__()
self.session = requests.Session() self.session = requests.Session()
if proxy_config is not None: if proxy_config is not None:
proxy = httpclient.format_proxy(proxy_config) proxy = httpclient.format_proxy(proxy_config)
self.session.proxies.update({'http': proxy, 'https': proxy}) self.session.proxies.update({"http": proxy, "https": proxy})
full_user_agent = httpclient.format_user_agent(user_agent) full_user_agent = httpclient.format_user_agent(user_agent)
self.session.headers.update({'user-agent': full_user_agent}) self.session.headers.update({"user-agent": full_user_agent})
self.session.headers.update({'cache-control': 'no-cache'}) self.session.headers.update({"cache-control": "no-cache"})
self.update_prefix()
def __del__(self):
self.session.close()
def set_lang(self, lang): def set_lang(self, lang):
langs = ['net', 'de', 'at', 'fr', 'pt', 'es', 'dk', 'se', 'it', 'pl'] if lang == "en":
lang = "net"
langs = ["net", "de", "at", "fr", "pt", "es", "dk", "se", "it", "pl"]
self.base_url = "https://radio.net/"
if lang in langs: if lang in langs:
self.base_url = self.base_url.replace(".net", "." + lang) self.base_url = self.base_url.replace(".net", "." + lang)
else: else:
logging.error("Radio.net not supported language: %s", str(lang)) logging.warning("Radio.net not supported language: %s, defaulting to English", str(lang))
self.update_prefix()
def flush(self): def update_prefix(self):
self.top_stations = [] lang = self.base_url.split(".")[-1].replace("/", "")
self.local_stations = []
self.search_results = []
def current_milli_time(self):
return int(round(time.time() * 1000))
def get_api_key(self):
if self.api_key is not None:
return
tmp_str = self.session.get(self.base_url)
# apiprefix_search = re.search('apiPrefix ?: ?\'(.*)\',?', tmp_str.content.decode())
# self.api_prefix = apiprefix_search.group(1)
lang = self.base_url.split('.')[-1].replace('/', '')
self.api_prefix = "https://api.radio." + lang + "/info/v2" self.api_prefix = "https://api.radio." + lang + "/info/v2"
apikey_search = re.search('apiKey ?: ?[\'|"](.*)[\'|"],?', tmp_str.content.decode()) def set_apikey(self, api_key):
self.api_key = apikey_search.group(1) self.api_key = api_key
logger.info('Radio.net: APIPREFIX %s' % self.api_prefix) def do_get(self, api_suffix, url_params=None):
logger.info('Radio.net: APIKEY %s' % self.api_key) if self.api_prefix is None:
return None
def do_post(self, api_sufix, url_params=None, payload=None): if url_params is None:
self.get_api_key() url_params = {}
url_params["apikey"] = self.api_key
if 'apikey' in url_params.keys(): response = self.session.get(self.api_prefix + api_suffix, params=url_params)
url_params['apikey'] = self.api_key
response = self.session.post(self.api_prefix + api_sufix,
params=url_params, data=payload)
return response return response
def do_get(self, api_sufix, url_params=None): def get_cache(self, key):
self.get_api_key() if self.cache.get(key) is not None and self.cache[key].expired() is False:
return self.cache[key].value()
return None
if 'apikey' in url_params.keys(): def set_cache(self, key, value, expires):
url_params['apikey'] = self.api_key self.cache[key] = CacheItem(value, expires)
return value
response = self.session.get(self.api_prefix + api_sufix,
params=url_params)
return response
def get_station_by_id(self, station_id): def get_station_by_id(self, station_id):
if not self.stations_by_id.get(station_id):
return self._get_station_by_id(station_id)
return self.stations_by_id.get(station_id)
api_suffix = '/search/station' def get_station_by_slug(self, station_slug):
if not self.stations_by_slug.get(station_slug):
return self._get_station_by_id(station_slug)
return self.stations_by_slug.get(station_slug)
def _get_station_by_id(self, station_id):
cache_key = "station/" + str(station_id)
cache = self.get_cache(cache_key)
if cache is not None:
return cache
api_suffix = "/search/station"
url_params = { url_params = {
'apikey': self.api_key, "station": station_id,
'_': self.current_milli_time(),
'station': station_id,
} }
response = self.do_get(api_suffix, url_params) response = self.do_get(api_suffix, url_params)
if response.status_code is not 200: if response.status_code != 200:
logger.error('Radio.net: Error on get station by id ' + logger.error(
str(station_id) + ". Error: " + response.text) "Radio.net: Error on get station by id "
+ str(station_id)
+ ". Error: "
+ response.text
)
return False return False
else:
logger.debug('Radio.net: Done get top stations list')
json = response.json()
logger.debug("Radio.net: Done get top stations list")
json = response.json()
if not self.stations_by_id.get(json["id"]):
station = Station() station = Station()
station.id = json['id'] station.playable = True
station.continent = json['continent'] else:
station.country = json['country'] station = self.stations_by_id[json["id"]]
station.city = json['city'] station.id = json["id"]
station.genres = ', '.join(json["genres"]) station.continent = json["continent"]
station.name = json['name'] station.country = json["country"]
station.stream_url = self.get_stream_url( station.city = json["city"]
json['streamUrls'], self.min_bitrate) station.genres = ", ".join(json["genres"])
station.image = json['logo100x100'] station.name = json["name"]
station.description = json['shortDescription'] station.slug = json["subdomain"]
if json['playable'] == 'PLAYABLE': station.stream_url = self._get_stream_url(json["streamUrls"], self.min_bitrate)
station.playable = True station.image_tiny = json["logo44x44"]
station.image_small = json["logo100x100"]
station.image_medium = json["logo175x175"]
station.image_large = json["logo300x300"]
station.description = json["shortDescription"]
if json["playable"] == "PLAYABLE":
station.playable = True
return station self.stations_by_id[station.id] = station
self.stations_by_slug[station.slug] = station
def get_local_stations(self): self.set_cache("station/" + str(station.id), station, 1440)
self.local_stations = [] self.set_cache("station/" + station.slug, station, 1440)
return station
api_suffix = '/search/localstations' def _get_station_from_search_result(self, result):
if not self.stations_by_id.get(result["id"]):
station = Station()
station.playable = True
else:
station = self.stations_by_id[result["id"]]
station.id = result["id"]
if result["continent"] is not None:
station.continent = result["continent"]["value"]
else:
station.continent = ""
if result["country"] is not None:
station.country = result["country"]["value"]
else:
station.country = ""
if result["city"] is not None:
station.city = result["city"]["value"]
else:
station.city = ""
if result["name"] is not None:
station.name = result["name"]["value"]
else:
station.name = ""
if result["subdomain"] is not None:
station.slug = result["subdomain"]["value"]
else:
station.slug = ""
if result["shortDescription"] is not None:
station.description = result["shortDescription"]["value"]
else:
station.description = ""
station.image_tiny = result["logo44x44"]
station.image_small = result["logo100x100"]
station.image_medium = result["logo175x175"]
self.stations_by_id[station.id] = station
self.stations_by_slug[station.slug] = station
return station
def get_genres(self):
return self._get_items("genres")
def get_topics(self):
return self._get_items("topics")
def get_languages(self):
return self._get_items("languages")
def get_cities(self):
return self._get_items("cities")
def get_countries(self):
return self._get_items("countries")
def _get_items(self, key):
cached = self.get_cache(key)
if cached is not None:
return cached
api_suffix = "/search/get" + key
response = self.do_get(api_suffix)
if response.status_code != 200:
logger.error(
"Radio.net: Error on get item list "
+ str(api_suffix)
+ ". Error: "
+ response.text
)
return False
return self.set_cache(key, response.json(), 1440)
def get_sorted_category(self, category, name, sorting, page):
results = []
for result in self._get_sorted_category(category, name, sorting, page):
results.append(self._get_station_from_search_result(result))
return results
def _get_sorted_category(self, category, name, sorting, page):
if sorting == "az":
sorting = "STATION_NAME"
else:
sorting = "RANK"
cache_key = category + "/" + name + "/" + sorting + "/" + str(page)
cache = self.get_cache(cache_key)
if cache is not None:
return cache
api_suffix = "/search/stationsby" + self.category_param_map[category]
url_params = { url_params = {
'apikey': self.api_key, self.category_param_map[category]: name,
'_': self.current_milli_time(), "sorttype": sorting,
'pageindex': 1, "sizeperpage": 50,
'sizeperpage': 100, "pageindex": page,
} }
response = self.do_post(api_suffix, url_params) response = self.do_get(api_suffix, url_params)
if response.status_code is not 200: if response.status_code != 200:
logger.error('Radio.net: Get local stations error. ' + logger.error(
response.text) "Radio.net: Error on get station by "
else: + str(category)
logger.debug('Radio.net: Done get local stations list') + ". Error: "
json = response.json() + response.text
for match in json['categories'][0]['matches']: )
station = self.get_station_by_id(match['id']) return False
if station:
if station.playable:
self.local_stations.append(station)
logger.info('Radio.net: Loaded ' + str(len(self.local_stations)) + json = response.json()
' local stations.') self.set_cache(category + "/" + name, int(json["numberPages"]), 10)
return self.set_cache(cache_key, json["categories"][0]["matches"], 10)
def get_category(self, category, page):
results = []
for result in self._get_category(category, page):
results.append(self._get_station_from_search_result(result))
return results
def _get_category(self, category, page):
cache_key = category + "/" + str(page)
cache = self.get_cache(cache_key)
if cache is not None:
return cache
api_suffix = "/search/" + category
url_params = {"sizeperpage": 50, "pageindex": page}
response = self.do_get(api_suffix, url_params)
if response.status_code != 200:
logger.error(
"Radio.net: Error on get station by "
+ str(category)
+ ". Error: "
+ response.text
)
return False
json = response.json()
self.set_cache(category, int(json["numberPages"]), 10)
return self.set_cache(cache_key, json["categories"][0]["matches"], 10)
def get_sorted_category_pages(self, category, name):
cache_key = category + "/" + name
cache = self.get_cache(cache_key)
if cache is not None:
return cache
self.get_sorted_category(category, name, "rank", 1)
return self.get_cache(cache_key)
def get_category_pages(self, category):
cache_key = category
cache = self.get_cache(cache_key)
if cache is not None:
return cache
self.get_category(category, 1)
return self.get_cache(cache_key)
def set_favorites(self, favorites): def set_favorites(self, favorites):
self.favortes = favorites self.favorites = favorites
def get_favorites(self): def get_favorites(self):
self.favorite_stations = [] cache_key = "favorites"
cache = self.get_cache(cache_key)
if cache is not None:
return cache
for station in self.favortes: favorite_stations = []
api_suffix = '/search/stationsonly' for station_slug in self.favorites:
url_params = {
'apikey': self.api_key,
'_': self.current_milli_time(),
'query': station,
'pageindex': 1,
}
response = self.do_post(api_suffix, url_params) station = self.get_station_by_slug(station_slug)
if response.status_code is not 200: if station is False:
logger.error('Radio.net: Search error ' + response.text) api_suffix = "/search/stationsonly"
else: url_params = {
logger.debug('Radio.net: Done search') "query": station_slug,
json = response.json() "pageindex": 1,
}
response = self.do_get(api_suffix, url_params)
# take only the first match! if response.status_code != 200:
station = self.get_station_by_id(json['categories'][0]['matches'][0]['id']) logger.error("Radio.net: Search error " + response.text)
if station and station.playable: else:
self.favorite_stations.append(station) logger.debug("Radio.net: Done search")
json = response.json()
logger.info('Radio.net: Loaded ' + str(len(self.favorite_stations)) + number_pages = int(json["numberPages"])
' favorite stations.')
def get_top_stations(self): if number_pages != 0:
self.top_stations = [] # take only the first match!
api_suffix = '/search/topstations' station = self._get_station_from_search_result(
json["categories"][0]["matches"][0]
)
else:
logger.warning("Radio.net: No results for %s", station_slug)
if station and station.playable:
favorite_stations.append(station)
logger.info(
"Radio.net: Loaded " + str(len(favorite_stations)) + " favorite stations."
)
return self.set_cache(cache_key, favorite_stations, 1440)
def do_search(self, query_string, page_index=1, search_results=None):
api_suffix = "/search/stationsonly"
url_params = { url_params = {
'apikey': self.api_key, "query": query_string,
'_': self.current_milli_time(), "sizeperpage": 50,
'pageindex': 1, "pageindex": page_index,
'sizeperpage': 100,
} }
response = self.do_post(api_suffix, url_params) response = self.do_get(api_suffix, url_params)
if response.status_code is not 200: if response.status_code != 200:
logger.error('Radio.net: Get top stations error. ' + response.text) logger.error("Radio.net: Search error " + response.text)
else: else:
logger.debug('Radio.net: Done get top stations list') logger.debug("Radio.net: Done search")
if search_results is None:
search_results = []
json = response.json() json = response.json()
for match in json['categories'][0]['matches']: for match in json["categories"][0]["matches"]:
station = self.get_station_by_id(match['id']) station = self._get_station_from_search_result(match)
if station:
if station.playable:
self.top_stations.append(station)
logger.info('Radio.net: Loaded ' + str(len(self.top_stations)) +
' top stations.')
def do_search(self, query_string, page_index=1):
if page_index == 1:
self.search_results = []
api_suffix = '/search/stationsonly'
url_params = {
'apikey': self.api_key,
'_': self.current_milli_time(),
'query': query_string,
'pageindex': page_index,
}
response = self.do_post(api_suffix, url_params)
if response.status_code is not 200:
logger.error('Radio.net: Search error ' + response.text)
else:
logger.debug('Radio.net: Done search')
json = response.json()
for match in json['categories'][0]['matches']:
station = self.get_station_by_id(match['id'])
if station and station.playable: if station and station.playable:
self.search_results.append(station) search_results.append(station)
number_pages = int(json['numberPages']) number_pages = int(json["numberPages"])
if number_pages >= page_index: if number_pages >= page_index:
self.do_search(query_string, page_index + 1) self.do_search(query_string, page_index + 1, search_results)
else: else:
logger.info('Radio.net: Found ' + logger.info(
str(len(self.search_results)) + ' stations.') "Radio.net: Found " + str(len(search_results)) + " stations."
)
return search_results
def get_stream_url(self, stream_json, bit_rate): def get_stream_url(self, station_id):
station = self.get_station_by_id(station_id)
if not station.stream_url:
station = self._get_station_by_id(station.id)
return station.stream_url
def _get_stream_url(self, stream_json, bit_rate):
stream_url = None stream_url = None
for stream in stream_json: for stream in stream_json:
if int(stream['bitRate']) >= bit_rate and \ if int(stream["bitRate"]) >= bit_rate and stream["streamStatus"] == "VALID":
stream['streamStatus'] == 'VALID': stream_url = stream["streamUrl"]
stream_url = stream['streamUrl']
break break
if stream_url is None and len(stream_json) > 0: if stream_url is None and len(stream_json) > 0:
stream_url = stream_json[0]['streamUrl'] stream_url = stream_json[0]["streamUrl"]
return stream_url return stream_url
class CacheItem(object):
def __init__(self, value, expires=10):
self._value = value
self._expires = time.time() + expires * 60
def expired(self):
return self._expires < time.time()
def value(self):
return self._value

View File

@@ -1,3 +1,52 @@
[metadata]
name = Mopidy-RadioNet
version = 0.2.2
url = https://github.com/plintx/mopidy-radionet
license = Apache License, Version 2.0
license_file = LICENSE
description = Mopidy extension for playing music from Radio.net
long_description = file: README.rst
classifiers =
Environment :: No Input/Output (Daemon)
Intended Audience :: End Users/Desktop
License :: OSI Approved :: Apache Software License
Operating System :: OS Independent
Programming Language :: Python :: 3
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Topic :: Multimedia :: Sound/Audio :: Players
[options]
zip_safe = False
include_package_data = True
packages = find:
python_requires = >= 3.7
install_requires =
Mopidy >= 3.0.0
Pykka >= 2.0.1
pyspotify >= 2.0.5
requests >= 2.20.0
setuptools
[options.extras_require]
lint =
black
check-manifest
flake8
flake8-black
flake8-bugbear
flake8-import-order
isort
test =
pytest
pytest-cov
responses
dev =
%(lint)s
%(test)s
[flake8] [flake8]
application-import-names = mopidy_radionet,tests application-import-names = mopidy_radionet,tests
exclude = .git,.tox exclude = .git,.tox

View File

@@ -7,7 +7,7 @@ from setuptools import find_packages, setup
def get_version(filename): def get_version(filename):
with open(filename) as fh: with open(filename) as fh:
metadata = dict(re.findall("__([a-z]+)__ = '([^']+)'", fh.read())) metadata = dict(re.findall("__([a-z]+)__ = \"([^\"]+)\"", fh.read()))
return metadata['version'] return metadata['version']

27
tests/conftest.py Normal file
View File

@@ -0,0 +1,27 @@
from unittest import mock
import pytest
from mopidy_radionet import backend
from mopidy_radionet.radionet import RadioNetClient
from mopidy_radionet.library import RadioNetLibraryProvider
@pytest.fixture
def backend_mock():
backend_mock = mock.Mock(spec=backend.RadioNetBackend)
backend_mock.radionet = RadioNetClient(proxy_config=None)
backend_mock.library = RadioNetLibraryProvider(backend=backend_mock)
backend_mock.radionet.set_apikey('test')
backend_mock.radionet.set_favorites({'lush'})
return backend_mock
@pytest.fixture
def library(backend_mock):
return backend_mock.library
@pytest.fixture
def radionet(backend_mock):
return backend_mock.radionet

172
tests/test_library.py Normal file
View File

@@ -0,0 +1,172 @@
def test_browse_root(library):
results = library.browse('radionet:root')
assert 8 == len(results)
def test_browse_localstations(library):
results = library.browse('radionet:localstations')
assert len(results) > 0
page_uri = results[0].uri if results is not None else None
assert page_uri is not None
# 1 Page, not results
# results = library.browse(page_uri)
# assert len(results) > 0
def test_browse_topstations(library):
results = library.browse('radionet:topstations')
assert len(results) > 0
def test_browse_genres(library):
results = library.browse('radionet:genres')
assert len(results) > 0
cat_uri = results[0].uri if results is not None else None
assert cat_uri is not None
results = library.browse(cat_uri)
assert len(results) == 2
sort_uri = results[0].uri if results is not None else None
assert sort_uri is not None
results = library.browse(sort_uri)
assert len(results) > 0
page_uri = results[0].uri if results is not None else None
assert page_uri is not None
results = library.browse(page_uri)
assert len(results) > 0
def test_browse_topics(library):
results = library.browse('radionet:topics')
assert len(results) > 0
cat_uri = results[0].uri if results is not None else None
assert cat_uri is not None
results = library.browse(cat_uri)
assert len(results) == 2
sort_uri = results[0].uri if results is not None else None
assert sort_uri is not None
results = library.browse(sort_uri)
assert len(results) > 0
page_uri = results[0].uri if results is not None else None
assert page_uri is not None
# 1 Page, not results
# results = library.browse(page_uri)
# assert len(results) > 0
def test_browse_languages(library):
results = library.browse('radionet:languages')
assert len(results) > 0
cat_uri = results[5].uri if results is not None else None
assert cat_uri is not None
results = library.browse(cat_uri)
assert len(results) == 2
sort_uri = results[0].uri if results is not None else None
assert sort_uri is not None
results = library.browse(sort_uri)
assert len(results) > 0
page_uri = results[0].uri if results is not None else None
assert page_uri is not None
# 1 Page, not results
# results = library.browse(page_uri)
# assert len(results) > 0
def test_browse_cities(library):
results = library.browse('radionet:cities')
assert len(results) > 0
cat_uri = results[0].uri if results is not None else None
assert cat_uri is not None
results = library.browse(cat_uri)
assert len(results) == 2
sort_uri = results[0].uri if results is not None else None
assert sort_uri is not None
results = library.browse(sort_uri)
assert len(results) > 0
page_uri = results[0].uri if results is not None else None
assert page_uri is not None
# 1 Page, not results
# results = library.browse(page_uri)
# assert len(results) > 0
def test_browse_countries(library):
results = library.browse('radionet:countries')
assert len(results) > 0
cat_uri = results[0].uri if results is not None else None
assert cat_uri is not None
results = library.browse(cat_uri)
assert len(results) == 2
sort_uri = results[0].uri if results is not None else None
assert sort_uri is not None
results = library.browse(sort_uri)
assert len(results) > 0
page_uri = results[0].uri if results is not None else None
assert page_uri is not None
# 1 Page, not results
# results = library.browse(page_uri)
# assert len(results) > 0
def test_browse_favorites(library):
results = library.browse('radionet:favorites')
assert 1 == len(results)
def test_search(library):
result = library.search({'any': ['radio ram']})
assert len(result.tracks) > 0
old_length = len(result.tracks)
result = library.search({'any': ['radio ram']})
assert len(result.tracks) == old_length
def test_lookup(library):
results = library.browse('radionet:favorites')
assert 1 == len(results)
for result in results:
assert library.lookup(result.uri) is not None
def test_track_by_slug(library):
results = library.lookup('radionet:track:dancefm')
assert 1 == len(results)
assert results[0].uri == 'radionet:track:2180'

View File

@@ -1,38 +1,42 @@
import unittest
from mopidy_radionet.radionet import RadioNetClient
class RadioNetClientTest(unittest.TestCase): def test_get_genres(radionet):
genres = radionet.get_genres()
def test_get_api_key(self): assert len(genres) > 0
radionet = RadioNetClient()
radionet.get_api_key()
self.assertIsNotNone(radionet.api_key)
def test_get_top_stations(self):
radionet = RadioNetClient()
radionet.get_top_stations()
self.assertGreater(len(radionet.top_stations), 0)
def test_get_local_stations(self):
radionet = RadioNetClient()
radionet.get_local_stations()
self.assertGreater(len(radionet.local_stations), 0)
def test_do_search(self):
radionet = RadioNetClient()
radionet.do_search("radio ram")
self.assertGreater(len(radionet.search_results), 0)
def test_get_favorites(self):
test_favorites = ("Rock Antenne", "radio ram")
radionet = RadioNetClient()
radionet.set_favorites(test_favorites)
radionet.get_favorites()
self.assertEqual(len(radionet.favorite_stations), len(test_favorites))
if __name__ == "__main__": def test_get_top_stations(radionet):
unittest.main() result = radionet.get_category('topstations', 1)
assert len(result) > 0
def test_get_local_stations(radionet):
result = radionet.get_category('localstations', 1)
assert len(result) > 0
def test_do_search(radionet):
result = radionet.do_search("radio ram")
assert len(result) > 0
assert result[0].stream_url is None
assert radionet.get_stream_url(result[0].id) is not None
def test_get_favorites(radionet):
radionet.cache = {};
test_favorites = ["Rock Antenne", "radio ram", "eska", "dancefm"]
radionet.set_favorites(test_favorites)
result = radionet.get_favorites()
assert len(result) == len(test_favorites)
assert result[2].name == 'Eska'
def test_favorites_broken_slug(radionet):
radionet.cache = {};
test_favorites = ["radio357"]
radionet.set_favorites(test_favorites)
result = radionet.get_favorites()
assert len(result) == 0