Python3 Migrate

This commit is contained in:
MariuszC
2020-01-18 20:01:00 +01:00
parent ea05af2d15
commit 6cd7e0fe44
691 changed files with 201846 additions and 598 deletions

View File

@@ -0,0 +1,321 @@
import configparser
import itertools
import logging
import os
import pathlib
import re
from collections.abc import Mapping
from mopidy.config import keyring
from mopidy.config.schemas import ConfigSchema, MapConfigSchema
from mopidy.config.types import (
Boolean,
ConfigValue,
Deprecated,
DeprecatedValue,
Hostname,
Integer,
List,
LogColor,
LogLevel,
Path,
Port,
Secret,
String,
)
from mopidy.internal import path, versioning
__all__ = [
# TODO List everything that is reexported, not just the unused parts.
"ConfigValue",
"List",
]
logger = logging.getLogger(__name__)
_core_schema = ConfigSchema("core")
_core_schema["cache_dir"] = Path()
_core_schema["config_dir"] = Path()
_core_schema["data_dir"] = Path()
# MPD supports at most 10k tracks, some clients segfault when this is exceeded.
_core_schema["max_tracklist_length"] = Integer(minimum=1)
_core_schema["restore_state"] = Boolean(optional=True)
_logging_schema = ConfigSchema("logging")
_logging_schema["verbosity"] = Integer(minimum=-1, maximum=4)
_logging_schema["format"] = String()
_logging_schema["color"] = Boolean()
_logging_schema["console_format"] = Deprecated()
_logging_schema["debug_format"] = Deprecated()
_logging_schema["debug_file"] = Deprecated()
_logging_schema["config_file"] = Path(optional=True)
_loglevels_schema = MapConfigSchema("loglevels", LogLevel())
_logcolors_schema = MapConfigSchema("logcolors", LogColor())
_audio_schema = ConfigSchema("audio")
_audio_schema["mixer"] = String()
_audio_schema["mixer_track"] = Deprecated()
_audio_schema["mixer_volume"] = Integer(optional=True, minimum=0, maximum=100)
_audio_schema["output"] = String()
_audio_schema["visualizer"] = Deprecated()
_audio_schema["buffer_time"] = Integer(optional=True, minimum=1)
_proxy_schema = ConfigSchema("proxy")
_proxy_schema["scheme"] = String(
optional=True, choices=["http", "https", "socks4", "socks5"]
)
_proxy_schema["hostname"] = Hostname(optional=True)
_proxy_schema["port"] = Port(optional=True)
_proxy_schema["username"] = String(optional=True)
_proxy_schema["password"] = Secret(optional=True)
# NOTE: if multiple outputs ever comes something like LogLevelConfigSchema
# _outputs_schema = config.AudioOutputConfigSchema()
_schemas = [
_core_schema,
_logging_schema,
_loglevels_schema,
_logcolors_schema,
_audio_schema,
_proxy_schema,
]
_INITIAL_HELP = """
# For further information about options in this file see:
# https://docs.mopidy.com/
#
# The initial commented out values reflect the defaults as of:
# {versions}
#
# Available options and defaults might have changed since then,
# run `mopidy config` to see the current effective config and
# `mopidy --version` to check the current version.
"""
def read(config_file):
"""Helper to load config defaults in same way across core and extensions"""
return pathlib.Path(config_file).read_text(errors="surrogateescape")
def load(files, ext_schemas, ext_defaults, overrides):
config_dir = pathlib.Path(__file__).parent
defaults = [read(config_dir / "default.conf")]
defaults.extend(ext_defaults)
raw_config = _load(files, defaults, keyring.fetch() + (overrides or []))
schemas = _schemas[:]
schemas.extend(ext_schemas)
return _validate(raw_config, schemas)
def format(config, ext_schemas, comments=None, display=True):
schemas = _schemas[:]
schemas.extend(ext_schemas)
return _format(config, comments or {}, schemas, display, False)
def format_initial(extensions_data):
config_dir = pathlib.Path(__file__).parent
defaults = [read(config_dir / "default.conf")]
defaults.extend(d.extension.get_default_config() for d in extensions_data)
raw_config = _load([], defaults, [])
schemas = _schemas[:]
schemas.extend(d.extension.get_config_schema() for d in extensions_data)
config, errors = _validate(raw_config, schemas)
versions = [f"Mopidy {versioning.get_version()}"]
extensions_data = sorted(
extensions_data, key=lambda d: d.extension.dist_name
)
for data in extensions_data:
versions.append(f"{data.extension.dist_name} {data.extension.version}")
header = _INITIAL_HELP.strip().format(versions="\n# ".join(versions))
formatted_config = _format(
config=config, comments={}, schemas=schemas, display=False, disable=True
)
return header + "\n\n" + formatted_config
def _load(files, defaults, overrides):
parser = configparser.RawConfigParser()
# TODO: simply return path to config file for defaults so we can load it
# all in the same way?
logger.info("Loading config from builtin defaults")
for default in defaults:
if isinstance(default, bytes):
default = default.decode()
parser.read_string(default)
# Load config from a series of config files
for f in files:
f = path.expand_path(f)
if f.is_dir():
for g in f.iterdir():
if g.is_file() and g.suffix == ".conf":
_load_file(parser, g.resolve())
else:
_load_file(parser, f.resolve())
raw_config = {}
for section in parser.sections():
raw_config[section] = dict(parser.items(section))
logger.info("Loading config from command line options")
for section, key, value in overrides:
raw_config.setdefault(section, {})[key] = value
return raw_config
def _load_file(parser, file_path):
if not file_path.exists():
logger.debug(
f"Loading config from {file_path.as_uri()} failed; "
f"it does not exist"
)
return
if not os.access(str(file_path), os.R_OK):
logger.warning(
f"Loading config from file_path.as_uri() failed; "
f"read permission missing"
)
return
try:
logger.info(f"Loading config from {file_path.as_uri()}")
with file_path.open("r") as fh:
parser.read_file(fh)
except configparser.MissingSectionHeaderError:
logger.warning(
f"Loading config from {file_path.as_uri()} failed; "
f"it does not have a config section"
)
except configparser.ParsingError as e:
linenos = ", ".join(str(lineno) for lineno, line in e.errors)
logger.warning(
f"Config file {file_path.as_uri()} has errors; "
f"line {linenos} has been ignored"
)
except OSError:
# TODO: if this is the initial load of logging config we might not
# have a logger at this point, we might want to handle this better.
logger.debug(f"Config file {file_path.as_uri()} not found; skipping")
def _validate(raw_config, schemas):
# Get validated config
config = {}
errors = {}
sections = set(raw_config)
for schema in schemas:
sections.discard(schema.name)
values = raw_config.get(schema.name, {})
result, error = schema.deserialize(values)
if error:
errors[schema.name] = error
if result:
config[schema.name] = result
for section in sections:
logger.debug(f"Ignoring unknown config section: {section}")
return config, errors
def _format(config, comments, schemas, display, disable):
output = []
for schema in schemas:
serialized = schema.serialize(
config.get(schema.name, {}), display=display
)
if not serialized:
continue
output.append(f"[{schema.name}]")
for key, value in serialized.items():
if isinstance(value, DeprecatedValue):
continue
comment = comments.get(schema.name, {}).get(key, "")
output.append(f"{key} =")
if value is not None:
output[-1] += " " + value
if comment:
output[-1] += " ; " + comment.capitalize()
if disable:
output[-1] = re.sub(r"^", "#", output[-1], flags=re.M)
output.append("")
return "\n".join(output).strip()
def _preprocess(config_string):
"""Convert a raw config into a form that preserves comments etc."""
results = ["[__COMMENTS__]"]
counter = itertools.count(0)
section_re = re.compile(r"^(\[[^\]]+\])\s*(.+)$")
blank_line_re = re.compile(r"^\s*$")
comment_re = re.compile(r"^(#|;)")
inline_comment_re = re.compile(r" ;")
def newlines(match):
return f"__BLANK{next(counter):d}__ ="
def comments(match):
if match.group(1) == "#":
return f"__HASH{next(counter):d}__ ="
elif match.group(1) == ";":
return f"__SEMICOLON{next(counter):d}__ ="
def inlinecomments(match):
return f"\n__INLINE{next(counter):d}__ ="
def sections(match):
return (
f"{match.group(1)}\n__SECTION{next(counter):d}__ = {match.group(2)}"
)
for line in config_string.splitlines():
line = blank_line_re.sub(newlines, line)
line = section_re.sub(sections, line)
line = comment_re.sub(comments, line)
line = inline_comment_re.sub(inlinecomments, line)
results.append(line)
return "\n".join(results)
def _postprocess(config_string):
"""Converts a preprocessed config back to original form."""
flags = re.IGNORECASE | re.MULTILINE
result = re.sub(r"^\[__COMMENTS__\](\n|$)", "", config_string, flags=flags)
result = re.sub(r"\n__INLINE\d+__ =(.*)$", r" ;\g<1>", result, flags=flags)
result = re.sub(r"^__HASH\d+__ =(.*)$", r"#\g<1>", result, flags=flags)
result = re.sub(r"^__SEMICOLON\d+__ =(.*)$", r";\g<1>", result, flags=flags)
result = re.sub(r"\n__SECTION\d+__ =(.*)$", r"\g<1>", result, flags=flags)
result = re.sub(r"^__BLANK\d+__ =$", "", result, flags=flags)
return result
class Proxy(Mapping):
def __init__(self, data):
self._data = data
def __getitem__(self, key):
item = self._data.__getitem__(key)
if isinstance(item, dict):
return Proxy(item)
return item
def __iter__(self):
return self._data.__iter__()
def __len__(self):
return self._data.__len__()
def __repr__(self):
return f"Proxy({self._data!r})"

View File

@@ -0,0 +1,25 @@
[core]
cache_dir = $XDG_CACHE_DIR/mopidy
config_dir = $XDG_CONFIG_DIR/mopidy
data_dir = $XDG_DATA_DIR/mopidy
max_tracklist_length = 10000
restore_state = false
[logging]
verbosity = 0
format = %(levelname)-8s %(asctime)s [%(process)d:%(threadName)s] %(name)s\n %(message)s
color = true
config_file =
[audio]
mixer = software
mixer_volume =
output = autoaudiosink
buffer_time =
[proxy]
scheme =
hostname =
port =
username =
password =

View File

@@ -0,0 +1,177 @@
import logging
logger = logging.getLogger(__name__)
try:
import dbus
except ImportError:
dbus = None
# XXX: Hack to workaround introspection bug caused by gnome-keyring, should be
# fixed by version 3.5 per:
# https://git.gnome.org/browse/gnome-keyring/commit/?id=5dccbe88eb94eea9934e2b7
if dbus:
EMPTY_STRING = dbus.String("", variant_level=1)
else:
EMPTY_STRING = ""
FETCH_ERROR = (
"Fetching passwords from your keyring failed. Any passwords "
"stored in the keyring will not be available."
)
def fetch():
if not dbus:
logger.debug("%s (dbus not installed)", FETCH_ERROR)
return []
try:
bus = dbus.SessionBus()
except dbus.exceptions.DBusException as e:
logger.debug("%s (%s)", FETCH_ERROR, e)
return []
if not bus.name_has_owner("org.freedesktop.secrets"):
logger.debug(
"%s (org.freedesktop.secrets service not running)", FETCH_ERROR
)
return []
service = _service(bus)
session = service.OpenSession("plain", EMPTY_STRING)[1]
items, locked = service.SearchItems({"service": "mopidy"})
if not locked and not items:
return []
if locked:
# There is a chance we can unlock without prompting the users...
items, prompt = service.Unlock(locked)
if prompt != "/":
_prompt(bus, prompt).Dismiss()
logger.debug("%s (Keyring is locked)", FETCH_ERROR)
return []
result = []
secrets = service.GetSecrets(items, session, byte_arrays=True)
for item_path, values in secrets.items():
session_path, parameters, value, content_type = values
attrs = _item_attributes(bus, item_path)
result.append((attrs["section"], attrs["key"], bytes(value)))
return result
def set(section, key, value):
"""Store a secret config value for a given section/key.
Indicates if storage failed or succeeded.
"""
if not dbus:
logger.debug(
"Saving %s/%s to keyring failed. (dbus not installed)", section, key
)
return False
try:
bus = dbus.SessionBus()
except dbus.exceptions.DBusException as e:
logger.debug("Saving %s/%s to keyring failed. (%s)", section, key, e)
return False
if not bus.name_has_owner("org.freedesktop.secrets"):
logger.debug(
"Saving %s/%s to keyring failed. "
"(org.freedesktop.secrets service not running)",
section,
key,
)
return False
service = _service(bus)
collection = _collection(bus)
if not collection:
return False
if isinstance(value, str):
value = value.encode()
session = service.OpenSession("plain", EMPTY_STRING)[1]
secret = dbus.Struct(
(session, "", dbus.ByteArray(value), "plain/text; charset=utf8")
)
label = f"mopidy: {section}/{key}"
attributes = {"service": "mopidy", "section": section, "key": key}
properties = {
"org.freedesktop.Secret.Item.Label": label,
"org.freedesktop.Secret.Item.Attributes": attributes,
}
try:
item, prompt = collection.CreateItem(properties, secret, True)
except dbus.exceptions.DBusException as e:
# TODO: catch IsLocked errors etc.
logger.debug("Saving %s/%s to keyring failed. (%s)", section, key, e)
return False
if prompt == "/":
return True
_prompt(bus, prompt).Dismiss()
logger.debug(
"Saving secret %s/%s failed. (Keyring is locked)", section, key
)
return False
def _service(bus):
return _interface(
bus, "/org/freedesktop/secrets", "org.freedesktop.Secret.Service"
)
# NOTE: depending on versions and setup 'default' might not exists, so try and
# use it but fall back to the 'login' collection, and finally the 'session' one
# if all else fails. We should probably create a keyring/collection setting
# that allows users to set this so they have control over where their secrets
# get stored.
def _collection(bus):
for name in "aliases/default", "collection/login", "collection/session":
path = "/org/freedesktop/secrets/" + name
if _collection_exists(bus, path):
break
else:
return None
return _interface(bus, path, "org.freedesktop.Secret.Collection")
# NOTE: Hack to probe if a given collection actually exists. Needed to work
# around an introspection bug in setting passwords for non-existant aliases.
def _collection_exists(bus, path):
try:
item = _interface(bus, path, "org.freedesktop.DBus.Properties")
item.Get("org.freedesktop.Secret.Collection", "Label")
return True
except dbus.exceptions.DBusException:
return False
# NOTE: We could call prompt.Prompt('') to unlock the keyring when it is not
# '/', but we would then also have to arrange to setup signals to wait until
# this has been completed. So for now we just dismiss the prompt and expect
# keyrings to be unlocked.
def _prompt(bus, path):
return _interface(bus, path, "Prompt")
def _item_attributes(bus, path):
item = _interface(bus, path, "org.freedesktop.DBus.Properties")
result = item.Get("org.freedesktop.Secret.Item", "Attributes")
return {bytes(k): bytes(v) for k, v in result.items()}
def _interface(bus, path, interface):
obj = bus.get_object("org.freedesktop.secrets", path)
return dbus.Interface(obj, interface)

View File

@@ -0,0 +1,125 @@
import collections
from mopidy.config import types
def _did_you_mean(name, choices):
"""Suggest most likely setting based on levenshtein."""
if not choices:
return None
name = name.lower()
candidates = [(_levenshtein(name, c), c) for c in choices]
candidates.sort()
if candidates[0][0] <= 3:
return candidates[0][1]
return None
def _levenshtein(a, b):
"""Calculates the Levenshtein distance between a and b."""
n, m = len(a), len(b)
if n > m:
return _levenshtein(b, a)
current = range(n + 1)
for i in range(1, m + 1):
previous, current = current, [i] + [0] * n
for j in range(1, n + 1):
add, delete = previous[j] + 1, current[j - 1] + 1
change = previous[j - 1]
if a[j - 1] != b[i - 1]:
change += 1
current[j] = min(add, delete, change)
return current[n]
class ConfigSchema(collections.OrderedDict):
"""Logical group of config values that correspond to a config section.
Schemas are set up by assigning config keys with config values to
instances. Once setup :meth:`deserialize` can be called with a dict of
values to process. For convienience we also support :meth:`format` method
that can used for converting the values to a dict that can be printed and
:meth:`serialize` for converting the values to a form suitable for
persistence.
"""
def __init__(self, name):
super().__init__()
self.name = name
def deserialize(self, values):
"""Validates the given ``values`` using the config schema.
Returns a tuple with cleaned values and errors.
"""
errors = {}
result = {}
for key, value in values.items():
try:
result[key] = self[key].deserialize(value)
except KeyError: # not in our schema
errors[key] = "unknown config key."
suggestion = _did_you_mean(key, self.keys())
if suggestion:
errors[key] += f" Did you mean {suggestion!r}?"
except ValueError as e: # deserialization failed
result[key] = None
errors[key] = str(e)
for key in self.keys():
if isinstance(self[key], types.Deprecated):
result.pop(key, None)
elif key not in result and key not in errors:
result[key] = None
errors[key] = "config key not found."
return result, errors
def serialize(self, values, display=False):
"""Converts the given ``values`` to a format suitable for persistence.
If ``display`` is :class:`True` secret config values, like passwords,
will be masked out.
Returns a dict of config keys and values."""
result = collections.OrderedDict()
for key in self.keys():
if key in values:
result[key] = self[key].serialize(values[key], display)
return result
class MapConfigSchema:
"""Schema for handling multiple unknown keys with the same type.
Does not sub-class :class:`ConfigSchema`, but implements the same
serialize/deserialize interface.
"""
def __init__(self, name, value_type):
self.name = name
self._value_type = value_type
def deserialize(self, values):
errors = {}
result = {}
for key, value in values.items():
try:
result[key] = self._value_type.deserialize(value)
except ValueError as e: # deserialization failed
result[key] = None
errors[key] = str(e)
return result, errors
def serialize(self, values, display=False):
result = collections.OrderedDict()
for key in sorted(values.keys()):
result[key] = self._value_type.serialize(values[key], display)
return result

View File

@@ -0,0 +1,323 @@
import logging
import re
import socket
from mopidy.config import validators
from mopidy.internal import log, path
def decode(value):
if isinstance(value, bytes):
value = value.decode(errors="surrogateescape")
for char in ("\\", "\n", "\t"):
value = value.replace(
char.encode(encoding="unicode-escape").decode(), char
)
return value
def encode(value):
if isinstance(value, bytes):
value = value.decode(errors="surrogateescape")
for char in ("\\", "\n", "\t"):
value = value.replace(
char, char.encode(encoding="unicode-escape").decode()
)
return value
class DeprecatedValue:
pass
class ConfigValue:
"""Represents a config key's value and how to handle it.
Normally you will only be interacting with sub-classes for config values
that encode either deserialization behavior and/or validation.
Each config value should be used for the following actions:
1. Deserializing from a raw string and validating, raising ValueError on
failure.
2. Serializing a value back to a string that can be stored in a config.
3. Formatting a value to a printable form (useful for masking secrets).
:class:`None` values should not be deserialized, serialized or formatted,
the code interacting with the config should simply skip None config values.
"""
def deserialize(self, value):
"""Cast raw string to appropriate type."""
return decode(value)
def serialize(self, value, display=False):
"""Convert value back to string for saving."""
if value is None:
return ""
return str(value)
class Deprecated(ConfigValue):
"""Deprecated value.
Used for ignoring old config values that are no longer in use, but should
not cause the config parser to crash.
"""
def deserialize(self, value):
return DeprecatedValue()
def serialize(self, value, display=False):
return DeprecatedValue()
class String(ConfigValue):
"""String value.
Is decoded as utf-8 and \\n \\t escapes should work and be preserved.
"""
def __init__(self, optional=False, choices=None):
self._required = not optional
self._choices = choices
def deserialize(self, value):
value = decode(value).strip()
validators.validate_required(value, self._required)
if not value:
return None
validators.validate_choice(value, self._choices)
return value
def serialize(self, value, display=False):
if value is None:
return ""
return encode(value)
class Secret(String):
"""Secret string value.
Is decoded as utf-8 and \\n \\t escapes should work and be preserved.
Should be used for passwords, auth tokens etc. Will mask value when being
displayed.
"""
def __init__(self, optional=False, choices=None):
self._required = not optional
self._choices = None # Choices doesn't make sense for secrets
def serialize(self, value, display=False):
if value is not None and display:
return "********"
return super().serialize(value, display)
class Integer(ConfigValue):
"""Integer value."""
def __init__(
self, minimum=None, maximum=None, choices=None, optional=False
):
self._required = not optional
self._minimum = minimum
self._maximum = maximum
self._choices = choices
def deserialize(self, value):
value = decode(value)
validators.validate_required(value, self._required)
if not value:
return None
value = int(value)
validators.validate_choice(value, self._choices)
validators.validate_minimum(value, self._minimum)
validators.validate_maximum(value, self._maximum)
return value
class Boolean(ConfigValue):
"""Boolean value.
Accepts ``1``, ``yes``, ``true``, and ``on`` with any casing as
:class:`True`.
Accepts ``0``, ``no``, ``false``, and ``off`` with any casing as
:class:`False`.
"""
true_values = ("1", "yes", "true", "on")
false_values = ("0", "no", "false", "off")
def __init__(self, optional=False):
self._required = not optional
def deserialize(self, value):
value = decode(value)
validators.validate_required(value, self._required)
if not value:
return None
if value.lower() in self.true_values:
return True
elif value.lower() in self.false_values:
return False
raise ValueError(f"invalid value for boolean: {value!r}")
def serialize(self, value, display=False):
if value is True:
return "true"
elif value in (False, None):
return "false"
else:
raise ValueError(f"{value!r} is not a boolean")
class List(ConfigValue):
"""List value.
Supports elements split by commas or newlines. Newlines take presedence and
empty list items will be filtered out.
"""
def __init__(self, optional=False):
self._required = not optional
def deserialize(self, value):
value = decode(value)
if "\n" in value:
values = re.split(r"\s*\n\s*", value)
else:
values = re.split(r"\s*,\s*", value)
values = tuple(v.strip() for v in values if v.strip())
validators.validate_required(values, self._required)
return tuple(values)
def serialize(self, value, display=False):
if not value:
return ""
return "\n " + "\n ".join(encode(v) for v in value if v)
class LogColor(ConfigValue):
def deserialize(self, value):
value = decode(value)
validators.validate_choice(value.lower(), log.COLORS)
return value.lower()
def serialize(self, value, display=False):
if value.lower() in log.COLORS:
return encode(value.lower())
return ""
class LogLevel(ConfigValue):
"""Log level value.
Expects one of ``critical``, ``error``, ``warning``, ``info``, ``debug``,
or ``all``, with any casing.
"""
levels = {
"critical": logging.CRITICAL,
"error": logging.ERROR,
"warning": logging.WARNING,
"info": logging.INFO,
"debug": logging.DEBUG,
"trace": log.TRACE_LOG_LEVEL,
"all": logging.NOTSET,
}
def deserialize(self, value):
value = decode(value)
validators.validate_choice(value.lower(), self.levels.keys())
return self.levels.get(value.lower())
def serialize(self, value, display=False):
lookup = {v: k for k, v in self.levels.items()}
if value in lookup:
return encode(lookup[value])
return ""
class Hostname(ConfigValue):
"""Network hostname value."""
def __init__(self, optional=False):
self._required = not optional
def deserialize(self, value, display=False):
value = decode(value).strip()
validators.validate_required(value, self._required)
if not value:
return None
socket_path = path.get_unix_socket_path(value)
if socket_path is not None:
path_str = Path(not self._required).deserialize(socket_path)
return f"unix:{path_str}"
try:
socket.getaddrinfo(value, None)
except OSError:
raise ValueError("must be a resolveable hostname or valid IP")
return value
class Port(Integer):
"""Network port value.
Expects integer in the range 0-65535, zero tells the kernel to simply
allocate a port for us.
"""
def __init__(self, choices=None, optional=False):
super().__init__(
minimum=0, maximum=2 ** 16 - 1, choices=choices, optional=optional
)
class _ExpandedPath(str):
def __new__(cls, original, expanded):
return super().__new__(cls, expanded)
def __init__(self, original, expanded):
self.original = original
class Path(ConfigValue):
"""File system path.
The following expansions of the path will be done:
- ``~`` to the current user's home directory
- ``$XDG_CACHE_DIR`` according to the XDG spec
- ``$XDG_CONFIG_DIR`` according to the XDG spec
- ``$XDG_DATA_DIR`` according to the XDG spec
- ``$XDG_MUSIC_DIR`` according to the XDG spec
"""
def __init__(self, optional=False):
self._required = not optional
def deserialize(self, value):
value = decode(value).strip()
expanded = path.expand_path(value)
validators.validate_required(value, self._required)
validators.validate_required(expanded, self._required)
if not value or expanded is None:
return None
return _ExpandedPath(value, expanded)
def serialize(self, value, display=False):
if isinstance(value, _ExpandedPath):
value = value.original
if isinstance(value, bytes):
value = value.decode(errors="surrogateescape")
return value

View File

@@ -0,0 +1,39 @@
# TODO: add validate regexp?
def validate_required(value, required):
"""Validate that ``value`` is set if ``required``
Normally called in :meth:`~mopidy.config.types.ConfigValue.deserialize` on
the raw string, _not_ the converted value.
"""
if required and not value:
raise ValueError("must be set.")
def validate_choice(value, choices):
"""Validate that ``value`` is one of the ``choices``
Normally called in :meth:`~mopidy.config.types.ConfigValue.deserialize`.
"""
if choices is not None and value not in choices:
names = ", ".join(repr(c) for c in choices)
raise ValueError(f"must be one of {names}, not {value}.")
def validate_minimum(value, minimum):
"""Validate that ``value`` is at least ``minimum``
Normally called in :meth:`~mopidy.config.types.ConfigValue.deserialize`.
"""
if minimum is not None and value < minimum:
raise ValueError(f"{value!r} must be larger than {minimum!r}.")
def validate_maximum(value, maximum):
"""Validate that ``value`` is at most ``maximum``
Normally called in :meth:`~mopidy.config.types.ConfigValue.deserialize`.
"""
if maximum is not None and value > maximum:
raise ValueError(f"{value!r} must be smaller than {maximum!r}.")