# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import json
import os
import platform
import re
import signal
import subprocess
import sys
from pathlib
import Path
from mozlint
import result
here = os.path.abspath(os.path.dirname(__file__))
def default_bindir():
# We use sys.prefix to find executables as that gets modified with
# virtualenv's activate_this.py, whereas sys.executable doesn't.
if platform.system() ==
"Windows":
return os.path.join(sys.prefix,
"Scripts")
else:
return os.path.join(sys.prefix,
"bin")
def get_ruff_version(binary):
"""
Returns found binary
's version
"""
try:
output = subprocess.check_output(
[binary,
"--version"],
stderr=subprocess.STDOUT,
text=
True,
)
except subprocess.CalledProcessError
as e:
output = e.output
matches = re.match(r
"ruff ([0-9\.]+)", output)
if matches:
return matches[1]
print(
"Error: Could not parse the version '{}'".format(output))
def run_process(config, cmd, **kwargs):
orig = signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=
True
)
signal.signal(signal.SIGINT, orig)
try:
output, _ = proc.communicate()
proc.wait()
except KeyboardInterrupt:
proc.kill()
return output
def lint(paths, config, log, **lintargs):
fixed = 0
results = []
if not paths:
return {
"results": results,
"fixed": fixed}
# Currently ruff only lints non `.py` files if they are explicitly passed
# in. So we need to find any non-py files manually. This can be removed
# after https://github.com/charliermarsh/ruff/issues/3410 is fixed.
exts = [e
for e
in config[
"extensions"]
if e !=
"py"]
non_py_files = []
for path
in paths:
p = Path(path)
if not p.is_dir():
continue
for ext
in exts:
non_py_files.extend([str(f)
for f
in p.glob(f
"**/*.{ext}")])
args = [
"ruff",
"check",
"--force-exclude"] + paths + non_py_files
if config.get(
"exclude"):
args.append(f
"--extend-exclude={','.join(config['exclude'])}")
process_kwargs = {
"processStderrLine":
lambda line: log.debug(line)}
warning_rules = set(config.get(
"warning-rules", []))
if lintargs.get(
"fix"):
# Do a first pass with --fix-only as the json format doesn't return the
# number of fixed issues.
fix_args = args + [
"--fix-only"]
if not lintargs.get(
"warning"):
# Don't fix warnings to limit unrelated changes sneaking into patches.
# except when --fix -W is passed
fix_args.append(f
"--extend-ignore={','.join(warning_rules)}")
log.debug(f
"Running --fix: {fix_args}")
output = run_process(config, fix_args, **process_kwargs)
matches = re.match(r
"Fixed (\d+) errors?.", output)
if matches:
fixed = int(matches[1])
args += [
"--output-format=json"]
log.debug(f
"Running with args: {args}")
output = run_process(config, args, **process_kwargs)
if not output:
return []
try:
issues = json.loads(output)
except json.JSONDecodeError:
log.error(f
"could not parse output: {output}")
return []
for issue
in issues:
res = {
"path": issue[
"filename"],
"lineno": issue[
"location"][
"row"],
"column": issue[
"location"][
"column"],
"lineoffset": issue[
"end_location"][
"row"] - issue[
"location"][
"row"],
"message": issue[
"message"],
"rule": issue[
"code"],
"level":
"error",
}
if any(issue[
"code"].startswith(w)
for w
in warning_rules):
res[
"level"] =
"warning"
if issue[
"fix"]:
res[
"hint"] = issue[
"fix"][
"message"]
results.append(result.from_config(config, **res))
return {
"results": results,
"fixed": fixed}