# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Code
for parsing metrics.yaml files.
"""
import functools
from pathlib
import Path
import textwrap
from typing
import Any, cast, Dict, Generator, Iterable, Optional, Set, Tuple, Union
import jsonschema
# type: ignore
from jsonschema.exceptions
import ValidationError
# type: ignore
from .metrics
import Metric, ObjectTree
from .pings
import Ping, RESERVED_PING_NAMES
from .tags
import Tag
from .
import util
from .util
import DictWrapper
ROOT_DIR = Path(__file__).parent
SCHEMAS_DIR = ROOT_DIR /
"schemas"
METRICS_ID =
"moz://mozilla.org/schemas/glean/metrics/2-0-0"
PINGS_ID =
"moz://mozilla.org/schemas/glean/pings/2-0-0"
TAGS_ID =
"moz://mozilla.org/schemas/glean/tags/1-0-0"
def _update_validator(validator):
"""
Adds some custom validators to the jsonschema validator that produce
nicer error messages.
"""
def required(validator, required, instance, schema):
if not validator.is_type(instance,
"object"):
return
missing_properties = set(
property
for property
in required
if property
not in instance
)
if len(missing_properties):
missing_properties = sorted(list(missing_properties))
yield ValidationError(
f
"Missing required properties: {', '.join(missing_properties)}"
)
validator.VALIDATORS[
"required"] = required
def _load_file(
filepath: Path, parser_config: Dict[str, Any]
) -> Generator[str,
None, Tuple[Dict[str, util.JSONType], Optional[str]]]:
"""
Load a metrics.yaml
or pings.yaml format file.
If the `filepath` does
not exist, raises `FileNotFoundError`, unless
`parser_config[
"allow_missing_files"]`
is `
True`.
"""
try:
content = util.load_yaml_or_json(filepath)
except FileNotFoundError:
if not parser_config.get(
"allow_missing_files",
False):
raise
else:
return {},
None
except Exception
as e:
yield util.format_error(filepath,
"", textwrap.fill(str(e)))
return {},
None
if content
is None:
yield util.format_error(filepath,
"", f
"'{filepath}' file can not be empty.")
return {},
None
if not isinstance(content, dict):
return {},
None
if content == {}:
return {},
None
schema_key = content.get(
"$schema")
if not isinstance(schema_key, str):
raise TypeError(f
"Invalid schema key {schema_key}")
filetype: Optional[str] =
None
try:
filetype = schema_key.split(
"/")[-2]
except IndexError:
filetype =
None
if filetype
not in (
"metrics",
"pings",
"tags"):
filetype =
None
for error
in validate(content, filepath):
content = {}
yield error
return content, filetype
@functools.lru_cache(maxsize=1)
def _load_schemas() -> Dict[str, Tuple[Any, Any]]:
"""
Load all of the known schemas
from disk,
and put them
in a map based on the
schema
's $id.
"""
schemas = {}
for schema_path
in SCHEMAS_DIR.glob(
"*.yaml"):
schema = util.load_yaml_or_json(schema_path)
resolver = util.get_null_resolver(schema)
validator_class = jsonschema.validators.validator_for(schema)
_update_validator(validator_class)
validator_class.check_schema(schema)
validator = validator_class(schema, resolver=resolver)
schemas[schema[
"$id"]] = (schema, validator)
return schemas
def _get_schema(
schema_id: str, filepath: Union[str, Path] =
"<input>"
) -> Tuple[Any, Any]:
"""
Get the schema
for the given schema $id.
"""
schemas = _load_schemas()
if schema_id
not in schemas:
raise ValueError(
util.format_error(
filepath,
"",
f
"$schema key must be one of {', '.join(schemas.keys())}",
)
)
return schemas[schema_id]
def _get_schema_for_content(
content: Dict[str, util.JSONType], filepath: Union[str, Path]
) -> Tuple[Any, Any]:
"""
Get the appropriate schema
for the given JSON content.
"""
schema_url = content.get(
"$schema")
if not isinstance(schema_url, str):
raise TypeError(
"Invalid $schema type {schema_url}")
return _get_schema(schema_url, filepath)
def validate(
content: Dict[str, util.JSONType], filepath: Union[str, Path] =
"<input>"
) -> Generator[str,
None,
None]:
"""
Validate the given content against the appropriate schema.
"""
try:
schema, validator = _get_schema_for_content(content, filepath)
except ValueError
as e:
yield str(e)
else:
yield from (
util.format_error(filepath,
"", util.pprint_validation_error(e))
for e
in validator.iter_errors(content)
)
def _instantiate_metrics(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str,
None,
None]:
"""
Load a list of metrics.yaml files, convert the JSON information into Metric
objects,
and merge them into a single tree.
"""
global_no_lint = content.get(
"no_lint", [])
global_tags = content.get(
"$tags", [])
assert isinstance(global_tags, list)
for category_key, category_val
in sorted(content.items()):
if category_key.startswith(
"$"):
continue
if category_key ==
"no_lint":
continue
if not config.get(
"allow_reserved")
and category_key.split(
".")[0] ==
"glean":
yield util.format_error(
filepath,
f
"For category '{category_key}'",
"Categories beginning with 'glean' are reserved for "
"Glean internal use.",
)
continue
all_objects.setdefault(category_key, DictWrapper())
if not isinstance(category_val, dict):
raise TypeError(f
"Invalid content for {category_key}")
for metric_key, metric_val
in sorted(category_val.items()):
try:
metric_obj = Metric.make_metric(
category_key, metric_key, metric_val, validated=
True, config=config
)
except Exception
as e:
yield util.format_error(
filepath,
f
"On instance {category_key}.{metric_key}",
str(e),
metric_val.defined_in[
"line"],
)
metric_obj =
None
else:
if (
not config.get(
"allow_reserved")
and "all-pings" in metric_obj.send_in_pings
):
yield util.format_error(
filepath,
f
"On instance {category_key}.{metric_key}",
'Only internal metrics may specify "all-pings" '
'in "send_in_pings"',
metric_val.defined_in[
"line"],
)
metric_obj =
None
if metric_obj
is not None:
metric_obj.no_lint = sorted(set(metric_obj.no_lint + global_no_lint))
if len(global_tags):
metric_obj.metadata[
"tags"] = sorted(
set(metric_obj.metadata.get(
"tags", []) + global_tags)
)
if isinstance(filepath, Path):
metric_obj.defined_in[
"filepath"] = str(filepath)
already_seen = sources.get((category_key, metric_key))
if already_seen
is not None:
# We've seen this metric name already
yield util.format_error(
filepath,
"",
(
f
"Duplicate metric name '{category_key}.{metric_key}' "
f
"already defined in '{already_seen}'"
),
metric_obj.defined_in[
"line"],
)
else:
all_objects[category_key][metric_key] = metric_obj
sources[(category_key, metric_key)] = filepath
def _instantiate_pings(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str,
None,
None]:
"""
Load a list of pings.yaml files, convert the JSON information into Ping
objects.
"""
global_no_lint = content.get(
"no_lint", [])
assert isinstance(global_no_lint, list)
ping_schedule_reverse_map: Dict[str, Set[str]] = dict()
for ping_key, ping_val
in sorted(content.items()):
if ping_key.startswith(
"$"):
continue
if ping_key ==
"no_lint":
continue
if not config.get(
"allow_reserved"):
if ping_key
in RESERVED_PING_NAMES:
yield util.format_error(
filepath,
f
"For ping '{ping_key}'",
f
"Ping uses a reserved name ({RESERVED_PING_NAMES})",
)
continue
if not isinstance(ping_val, dict):
raise TypeError(f
"Invalid content for ping {ping_key}")
ping_val[
"name"] = ping_key
if "metadata" in ping_val
and "ping_schedule" in ping_val[
"metadata"]:
if ping_key
in ping_val[
"metadata"][
"ping_schedule"]:
yield util.format_error(
filepath,
f
"For ping '{ping_key}'",
"ping_schedule contains its own ping name",
)
continue
for ping_schedule
in ping_val[
"metadata"][
"ping_schedule"]:
if ping_schedule
not in ping_schedule_reverse_map:
ping_schedule_reverse_map[ping_schedule] = set()
ping_schedule_reverse_map[ping_schedule].add(ping_key)
try:
ping_obj = Ping(
defined_in=getattr(ping_val,
"defined_in",
None),
_validated=
True,
**ping_val,
)
except Exception
as e:
yield util.format_error(filepath, f
"On instance '{ping_key}'", str(e))
continue
if ping_obj
is not None:
ping_obj.no_lint = sorted(set(ping_obj.no_lint + global_no_lint))
if isinstance(filepath, Path)
and ping_obj.defined_in
is not None:
ping_obj.defined_in[
"filepath"] = str(filepath)
already_seen = sources.get(ping_key)
if already_seen
is not None:
# We've seen this ping name already
yield util.format_error(
filepath,
"",
f
"Duplicate ping name '{ping_key}' "
f
"already defined in '{already_seen}'",
)
else:
all_objects.setdefault(
"pings", {})[ping_key] = ping_obj
sources[ping_key] = filepath
for scheduler, scheduled
in ping_schedule_reverse_map.items():
if scheduler
in all_objects[
"pings"]
and isinstance(
all_objects[
"pings"][scheduler], Ping
):
scheduler_obj: Ping = cast(Ping, all_objects[
"pings"][scheduler])
scheduler_obj.schedules_pings = sorted(list(scheduled))
def _instantiate_tags(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str,
None,
None]:
"""
Load a list of tags.yaml files, convert the JSON information into Tag
objects.
"""
global_no_lint = content.get(
"no_lint", [])
assert isinstance(global_no_lint, list)
for tag_key, tag_val
in sorted(content.items()):
if tag_key.startswith(
"$"):
continue
if tag_key ==
"no_lint":
continue
if not isinstance(tag_val, dict):
raise TypeError(f
"Invalid content for tag {tag_key}")
tag_val[
"name"] = tag_key
try:
tag_obj = Tag(
defined_in=getattr(tag_val,
"defined_in",
None),
_validated=
True,
**tag_val,
)
except Exception
as e:
yield util.format_error(filepath, f
"On instance '{tag_key}'", str(e))
continue
if tag_obj
is not None:
tag_obj.no_lint = sorted(set(tag_obj.no_lint + global_no_lint))
if isinstance(filepath, Path)
and tag_obj.defined_in
is not None:
tag_obj.defined_in[
"filepath"] = str(filepath)
already_seen = sources.get(tag_key)
if already_seen
is not None:
# We've seen this tag name already
yield util.format_error(
filepath,
"",
f
"Duplicate tag name '{tag_key}' "
f
"already defined in '{already_seen}'",
)
else:
all_objects.setdefault(
"tags", {})[tag_key] = tag_obj
sources[tag_key] = filepath
def _preprocess_objects(objs: ObjectTree, config: Dict[str, Any]) -> ObjectTree:
"""
Preprocess the object tree to better set defaults.
"""
for category
in objs.values():
for obj
in category.values():
if not isinstance(obj, Metric):
continue
if not config.get(
"do_not_disable_expired",
False)
and hasattr(
obj,
"is_disabled"
):
obj.disabled = obj.is_disabled()
if hasattr(obj,
"send_in_pings"):
if "default" in obj.send_in_pings:
obj.send_in_pings = obj.default_store_names + [
x
for x
in obj.send_in_pings
if x !=
"default"
]
obj.send_in_pings = sorted(list(set(obj.send_in_pings)))
return objs
@util.keep_value
def parse_objects(
filepaths: Iterable[Path], config: Optional[Dict[str, Any]] =
None
) -> Generator[str,
None, ObjectTree]:
"""
Parse one
or more metrics.yaml
and/
or pings.yaml files, returning a tree of
`metrics.Metric`, `pings.Ping`,
and `tags.Tag` instances.
The result
is a generator over any errors.
If there are no errors, the
actual metrics can be obtained
from `result.value`.
For example::
result = metrics.parse_metrics(filepaths)
for err
in result:
print(err)
all_metrics = result.value
The result value
is a dictionary of category names to categories, where
each category
is a dictionary
from metric name to `metrics.Metric`
instances. There are also the special categories `pings`
and `tags`
containing all of the `pings.Ping`
and `tags.Tag` instances, respectively.
:param filepaths: list of Path objects to metrics.yaml, pings.yaml,
and/
or
tags.yaml files
:param config: A dictionary of options that change parsing behavior.
Supported keys are:
- `allow_reserved`: Allow values reserved
for internal Glean use.
- `do_not_disable_expired`: Don
't mark expired metrics as disabled.
This
is useful when you want to retain the original
"disabled"
value
from the `metrics.yaml`, rather than having it overridden when
the metric expires.
- `allow_missing_files`: Do
not raise a `FileNotFoundError`
if any of
the input `filepaths` do
not exist.
- `interesting`: Contains an array of interesting metrics/ping files.
Probes
not included
in these files will be marked
as disabled.
"""
if config
is None:
config = {}
all_objects: ObjectTree = DictWrapper()
sources: Dict[Any, Path] = {}
filepaths = util.ensure_list(filepaths)
for filepath
in filepaths:
content, filetype =
yield from _load_file(filepath, config)
if filetype ==
"metrics":
yield from _instantiate_metrics(
all_objects, sources, content, filepath, config
)
elif filetype ==
"pings":
yield from _instantiate_pings(
all_objects, sources, content, filepath, config
)
elif filetype ==
"tags":
yield from _instantiate_tags(
all_objects, sources, content, filepath, config
)
if config.get(
"interesting"):
# We're configured to disable probes not included in the interesting list.
filepaths = util.ensure_list(config.get(
"interesting"))
interesting_metrics_dict: Dict[str, Dict[str, Any]] = dict()
interesting_metrics_dict.setdefault(
"metrics", DictWrapper())
interesting_metrics_dict.setdefault(
"pings", DictWrapper())
for filepath
in filepaths:
content, filetype =
yield from _load_file(filepath, config)
if not isinstance(content, dict):
raise TypeError(f
"Invalid content for {filepath}")
for category_key, category_val
in sorted(content.items()):
if category_key.startswith(
"$"):
continue
interesting_metrics_dict.setdefault(category_key, DictWrapper())
if not isinstance(category_val, dict):
raise TypeError(f
"Invalid category_val for {category_key}")
for metric_key, metric_val
in sorted(category_val.items()):
interesting_metrics_dict[category_key][metric_key] = metric_val
for category_key, category_val
in all_objects.items():
if category_key ==
"tags":
continue
for metric_key, metric_val
in sorted(category_val.items()):
category_dict = interesting_metrics_dict.get(category_key, {})
if metric_key
not in category_dict:
obj = all_objects[category_key][metric_key]
if hasattr(obj,
"disabled"):
obj.disabled =
True
return _preprocess_objects(all_objects, config)