def limit_to_number(name): """
Turn a string limit like u32-max or s64-min into its numerical value """ if name[0] == 'u'and name.endswith('-min'): return 0
width = int(name[1:-4]) if name[0] == 's':
width -= 1
value = (1 << width) - 1 if name[0] == 's'and name.endswith('-min'):
value = -value - 1 return value
class BaseNlLib: def get_family_id(self): return'ys->family_id'
if nested:
self.nested_attrs = nested if self.nested_attrs == family.name:
self.nested_render_name = c_lower(f"{family.ident_name}") else:
self.nested_render_name = c_lower(f"{family.ident_name}_{self.nested_attrs}")
if self.nested_attrs in self.family.consts:
self.nested_struct_type = 'struct ' + self.nested_render_name + '_' else:
self.nested_struct_type = 'struct ' + self.nested_render_name
self.c_name = c_lower(self.name) if self.c_name in _C_KW:
self.c_name += '_' if self.c_name[0].isdigit():
self.c_name = '_' + self.c_name
# Added by resolve():
self.enum_name = None
delattr(self, "enum_name")
def _get_real_attr(self): # if the attr is for a subset return the "real" attr (just one down, does not recurse) return self.family.attr_sets[self.attr_set.subset_of][self.name]
def set_request(self):
self.request = True if self.attr_set.subset_of:
self._get_real_attr().set_request()
def set_reply(self):
self.reply = True if self.attr_set.subset_of:
self._get_real_attr().set_reply()
def get_limit(self, limit, default=None):
value = self.checks.get(limit, default) if value isNone: return value if isinstance(value, int): return value if value in self.family.consts: return self.family.consts[value]["value"] return limit_to_number(value)
def get_limit_str(self, limit, default=None, suffix=''):
value = self.checks.get(limit, default) if value isNone: return'' if isinstance(value, int): return str(value) + suffix if value in self.family.consts:
const = self.family.consts[value] if const.get('header'): return c_upper(value) return c_upper(f"{self.family['name']}-{value}") return c_upper(value)
def presence_member(self, space, type_filter): if self.presence_type() != type_filter: return
if self.presence_type() == 'present':
pfx = '__'if space == 'user'else'' return f"{pfx}u32 {self.c_name}:1;"
if self.presence_type() in {'len', 'count'}:
pfx = '__'if space == 'user'else'' return f"{pfx}u32 {self.c_name};"
def _complex_member_type(self, ri): returnNone
def free_needs_iter(self): returnFalse
def _free_lines(self, ri, var, ref): if self.is_multi_val() or self.presence_type() in {'count', 'len'}: return [f'free({var}->{ref}{self.c_name});'] return []
def free(self, ri, var, ref):
lines = self._free_lines(ri, var, ref) for line in lines:
ri.cw.p(line)
def arg_member(self, ri):
member = self._complex_member_type(ri) if member:
spc = ' 'if member[-1] != '*'else''
arg = [member + spc + '*' + self.c_name] if self.presence_type() == 'count':
arg += ['unsigned int n_' + self.c_name] return arg raise Exception(f"Struct member not implemented for class type {self.type}")
def struct_member(self, ri):
member = self._complex_member_type(ri) if member:
ptr = '*'if self.is_multi_val() else'' if self.is_recursive_for_op(ri):
ptr = '*'
spc = ' 'if member[-1] != '*'else''
ri.cw.p(f"{member}{spc}{ptr}{self.c_name};") return
members = self.arg_member(ri) for one in members:
ri.cw.p(one + ';')
def _attr_put_line(self, ri, var, line):
presence = self.presence_type() if presence in {'present', 'len'}:
ri.cw.p(f"if ({var}->_{presence}.{self.c_name})")
ri.cw.p(f"{line};")
def _attr_put_simple(self, ri, var, put_type):
line = f"ynl_attr_put_{put_type}(nlh, {self.enum_name}, {var}->{self.c_name})"
self._attr_put_line(ri, var, line)
def attr_put(self, ri, var): raise Exception(f"Put not implemented for class type {self.type}")
def _attr_get(self, ri, var): raise Exception(f"Attr get not implemented for class type {self.type}")
def attr_get(self, ri, var, first):
lines, init_lines, local_vars = self._attr_get(ri, var) if type(lines) is str:
lines = [lines] if type(init_lines) is str:
init_lines = [init_lines]
kw = 'if'if first else'else if'
ri.cw.block_start(line=f"{kw} (type == {self.enum_name})") if local_vars: for local in local_vars:
ri.cw.p(local)
ri.cw.nl()
if init_lines:
ri.cw.nl() for line in init_lines:
ri.cw.p(line)
for line in lines:
ri.cw.p(line)
ri.cw.block_end() returnTrue
def _setter_lines(self, ri, member, presence): raise Exception(f"Setter not implemented for class type {self.type}")
def setter(self, ri, space, direction, deref=False, ref=None, var="req"):
ref = (ref if ref else []) + [self.c_name]
member = f"{var}->{'.'.join(ref)}"
local_vars = [] if self.free_needs_iter():
local_vars += ['unsigned int i;']
code = []
presence = '' for i in range(0, len(ref)):
presence = f"{var}->{'.'.join(ref[:i] + [''])}_present.{ref[i]}" # Every layer below last is a nest, so we know it uses bit presence # last layer is "self" and may be a complex type if i == len(ref) - 1 and self.presence_type() != 'present':
presence = f"{var}->{'.'.join(ref[:i] + [''])}_{self.presence_type()}.{ref[i]}" continue
code.append(presence + ' = 1;')
ref_path = '.'.join(ref[:-1]) if ref_path:
ref_path += '.'
code += self._free_lines(ri, var, ref_path)
code += self._setter_lines(ri, member, presence)
func_name = f"{op_prefix(ri, direction, deref=deref)}_set_{'_'.join(ref)}"
free = bool([x for x in code if'free('in x])
alloc = bool([x for x in code if'alloc('in x]) if free andnot alloc:
func_name = '__' + func_name
ri.cw.write_func('static inline void', func_name, local_vars=local_vars,
body=code,
args=[f'{type_name(ri, direction, deref=deref)} *{var}'] + self.arg_member(ri))
class TypeUnused(Type): def presence_type(self): return''
# Classic families have some funny enums, don't bother # computing checks, since we only need them for kernel policies ifnot family.is_classic():
self._init_checks()
for _, attr in ri.family.pure_nested_structs[self.nested_attrs].member_list(): if attr.is_recursive(): continue
attr.setter(ri, self.nested_attrs, direction, deref=deref, ref=ref,
var=var)
def _attr_typol(self):
typol = f'.type = YNL_PT_NEST, .nest = &{self.nested_render_name}_nest, '
typol += '.is_submsg = 1, ' # Reverse-parsing of the policy (ynl_err_walk() in ynl.c) does not # support external selectors. No family uses sub-messages with external # selector for requests so this is fine for now. ifnot self.selector.is_external():
typol += f'.selector_type = {self.attr_set[self["selector"]].value} ' return typol
class Selector: def __init__(self, msg_attr, attr_set):
self.name = msg_attr["selector"]
if self.name in attr_set:
self.attr = attr_set[self.name]
self.attr.is_selector = True
self._external = False else: # The selector will need to get passed down thru the structs
self.attr = None
self._external = True
def set_attr(self, attr):
self.attr = attr
def is_external(self): return self._external
class Struct: def __init__(self, family, space_name, type_list=None, fixed_header=None,
inherited=None, submsg=None):
self.family = family
self.space_name = space_name
self.attr_set = family.attr_sets[space_name] # Use list to catch comparisons with empty sets
self._inherited = inherited if inherited isnotNoneelse []
self.inherited = []
self.fixed_header = None if fixed_header:
self.fixed_header = 'struct ' + c_lower(fixed_header)
self.submsg = submsg
self.nested = type_list isNone if family.name == c_lower(space_name):
self.render_name = c_lower(family.ident_name) else:
self.render_name = c_lower(family.ident_name + '-' + space_name)
self.struct_name = 'struct ' + self.render_name if self.nested and space_name in family.consts:
self.struct_name += '_'
self.ptr_name = self.struct_name + ' *' # All attr sets this one contains, directly or multiple levels down
self.child_nests = set()
self.request = False
self.reply = False
self.recursive = False
self.in_multi_val = False# used by a MultiAttr or and legacy arrays
self.attr_list = []
self.attrs = dict() if type_list isnotNone: for t in type_list:
self.attr_list.append((t, self.attr_set[t]),) else: for t in self.attr_set:
self.attr_list.append((t, self.attr_set[t]),)
max_val = 0
self.attr_max_val = None for name, attr in self.attr_list: if attr.value >= max_val:
max_val = attr.value
self.attr_max_val = attr
self.attrs[name] = attr
def set_inherited(self, new_inherited): if self._inherited != new_inherited: raise Exception("Inheriting different members not supported")
self.inherited = [c_lower(x) for x in sorted(self._inherited)]
def external_selectors(self):
sels = [] for name, attr in self.attr_list: if isinstance(attr, TypeSubMessage) and attr.selector.is_external():
sels.append(attr.selector) return sels
def free_needs_iter(self): for _, attr in self.attr_list: if attr.free_needs_iter(): returnTrue returnFalse
# Added by resolve:
self.c_name = None
delattr(self, "c_name")
def resolve(self):
self.c_name = c_lower(self.name) if self.c_name in _C_KW:
self.c_name += '_' if self.c_name == self.family.c_name:
self.c_name = ''
def new_attr(self, elem, value): if elem['type'] in scalars:
t = TypeScalar(self.family, self, elem, value) elif elem['type'] == 'unused':
t = TypeUnused(self.family, self, elem, value) elif elem['type'] == 'pad':
t = TypePad(self.family, self, elem, value) elif elem['type'] == 'flag':
t = TypeFlag(self.family, self, elem, value) elif elem['type'] == 'string':
t = TypeString(self.family, self, elem, value) elif elem['type'] == 'binary': if'struct'in elem:
t = TypeBinaryStruct(self.family, self, elem, value) elif elem.get('sub-type') in scalars:
t = TypeBinaryScalarArray(self.family, self, elem, value) else:
t = TypeBinary(self.family, self, elem, value) elif elem['type'] == 'bitfield32':
t = TypeBitfield32(self.family, self, elem, value) elif elem['type'] == 'nest':
t = TypeNest(self.family, self, elem, value) elif elem['type'] == 'indexed-array'and'sub-type'in elem: if elem["sub-type"] in ['binary', 'nest', 'u32']:
t = TypeArrayNest(self.family, self, elem, value) else: raise Exception(f'new_attr: unsupported sub-type {elem["sub-type"]}') elif elem['type'] == 'nest-type-value':
t = TypeNestTypeValue(self.family, self, elem, value) elif elem['type'] == 'sub-message':
t = TypeSubMessage(self.family, self, elem, value) else: raise Exception(f"No typed class for type {elem['type']}")
if'multi-attr'in elem and elem['multi-attr']:
t = TypeMultiAttr(self.family, self, elem, value, t)
return t
class Operation(SpecOperation): def __init__(self, family, yaml, req_value, rsp_value): # Fill in missing operation properties (for fixed hdr-only msgs) for mode in ['do', 'dump', 'event']: for direction in ['request', 'reply']: try:
yaml[mode][direction].setdefault('attributes', []) except KeyError: pass
self.hooks = dict() for when in ['pre', 'post']:
self.hooks[when] = dict() for op_mode in ['do', 'dump']:
self.hooks[when][op_mode] = dict()
self.hooks[when][op_mode]['set'] = set()
self.hooks[when][op_mode]['list'] = []
def _mark_notify(self): for op in self.msgs.values(): if'notify'in op:
self.ops[op['notify']].mark_has_ntf()
# Fake a 'do' equivalent of all events, so that we can render their response parsing def _mock_up_events(self): for op in self.yaml['operations']['list']: if'event'in op:
op['do'] = { 'reply': { 'attributes': op['event']['attributes']
}
}
def _load_root_sets(self): for op_name, op in self.msgs.items(): if'attribute-set'notin op: continue
req_attrs = set()
rsp_attrs = set() for op_mode in ['do', 'dump']: if op_mode in op and'request'in op[op_mode]:
req_attrs.update(set(op[op_mode]['request']['attributes'])) if op_mode in op and'reply'in op[op_mode]:
rsp_attrs.update(set(op[op_mode]['reply']['attributes'])) if'event'in op:
rsp_attrs.update(set(op['event']['attributes']))
def _sort_pure_types(self): # Try to reorder according to dependencies
pns_key_list = list(self.pure_nested_structs.keys())
pns_key_seen = set()
rounds = len(pns_key_list) ** 2 # it's basically bubble sort for _ in range(rounds): if len(pns_key_list) == 0: break
name = pns_key_list.pop(0)
finished = True for _, spec in self.attr_sets[name].items(): if'nested-attributes'in spec:
nested = spec['nested-attributes'] elif'sub-message'in spec:
nested = spec.sub_message else: continue
# If the unknown nest we hit is recursive it's fine, it'll be a pointer if self.pure_nested_structs[nested].recursive: continue if nested notin pns_key_seen: # Dicts are sorted, this will make struct last
struct = self.pure_nested_structs.pop(name)
self.pure_nested_structs[name] = struct
finished = False break if finished:
pns_key_seen.add(name) else:
pns_key_list.append(name)
def _load_nested_set_nest(self, spec):
inherit = set()
nested = spec['nested-attributes'] if nested notin self.root_sets: if nested notin self.pure_nested_structs:
self.pure_nested_structs[nested] = \
Struct(self, nested, inherited=inherit,
fixed_header=spec.get('fixed-header')) else: raise Exception(f'Using attr set as root and nested not supported - {nested}')
if'type-value'in spec: if nested in self.root_sets: raise Exception("Inheriting members to a space used as root not supported")
inherit.update(set(spec['type-value'])) elif spec['type'] == 'indexed-array':
inherit.add('idx')
self.pure_nested_structs[nested].set_inherited(inherit)
return nested
def _load_nested_set_submsg(self, spec): # Fake the struct type for the sub-message itself # its not a attr_set but codegen wants attr_sets.
submsg = self.sub_msgs[spec["sub-message"]]
nested = submsg.name
while len(attr_set_queue):
a_set = attr_set_queue.pop(0) for attr, spec in self.attr_sets[a_set].items(): if'nested-attributes'in spec:
nested = self._load_nested_set_nest(spec) elif'sub-message'in spec:
nested = self._load_nested_set_submsg(spec) else: continue
if nested notin attr_set_seen:
attr_set_queue.append(nested)
attr_set_seen.add(nested)
for root_set, rs_members in self.root_sets.items(): for attr, spec in self.attr_sets[root_set].items(): if'nested-attributes'in spec:
nested = spec['nested-attributes'] elif'sub-message'in spec:
nested = spec.sub_message else:
nested = None
if nested: if attr in rs_members['request']:
self.pure_nested_structs[nested].request = True if attr in rs_members['reply']:
self.pure_nested_structs[nested].reply = True
if spec.is_multi_val():
child = self.pure_nested_structs.get(nested)
child.in_multi_val = True
self._sort_pure_types()
# Propagate the request / reply / recursive for attr_set, struct in reversed(self.pure_nested_structs.items()): for _, spec in self.attr_sets[attr_set].items(): if attr_set in struct.child_nests:
struct.recursive = True
struct.child_nests.add(child_name)
child = self.pure_nested_structs.get(child_name) if child: ifnot child.recursive:
struct.child_nests.update(child.child_nests)
child.request |= struct.request
child.reply |= struct.reply if spec.is_multi_val():
child.in_multi_val = True
self._sort_pure_types()
def _load_attr_use(self): for _, struct in self.pure_nested_structs.items(): if struct.request: for _, arg in struct.member_list():
arg.set_request() if struct.reply: for _, arg in struct.member_list():
arg.set_reply()
for root_set, rs_members in self.root_sets.items(): for attr, spec in self.attr_sets[root_set].items(): if attr in rs_members['request']:
spec.set_request() if attr in rs_members['reply']:
spec.set_reply()
def _load_selector_passing(self): def all_structs(): for k, v in reversed(self.pure_nested_structs.items()): yield k, v for k, _ in self.root_sets.items(): yield k, None# we don't have a struct, but it must be terminal
for attr_set, struct in all_structs(): for _, spec in self.attr_sets[attr_set].items(): if'nested-attributes'in spec:
child_name = spec['nested-attributes'] elif'sub-message'in spec:
child_name = spec.sub_message else: continue
child = self.pure_nested_structs.get(child_name) for selector in child.external_selectors(): if selector.name in self.attr_sets[attr_set]:
sel_attr = self.attr_sets[attr_set][selector.name]
selector.set_attr(sel_attr) else: raise Exception("Passing selector thru more than one layer not supported")
def _load_global_policy(self):
global_set = set()
attr_set_name = None for op_name, op in self.ops.items(): ifnot op: continue if'attribute-set'notin op: continue
if attr_set_name isNone:
attr_set_name = op['attribute-set'] if attr_set_name != op['attribute-set']: raise Exception('For a global policy all ops must use the same set')
for op_mode in ['do', 'dump']: if op_mode in op:
req = op[op_mode].get('request') if req:
global_set.update(req.get('attributes', []))
self.global_policy = []
self.global_policy_set = attr_set_name for attr in self.attr_sets[attr_set_name]: if attr in global_set:
self.global_policy.append(attr)
def _load_hooks(self): for op in self.ops.values(): for op_mode in ['do', 'dump']: if op_mode notin op: continue for when in ['pre', 'post']: if when notin op[op_mode]: continue
name = op[op_mode][when] if name in self.hooks[when][op_mode]['set']: continue
self.hooks[when][op_mode]['set'].add(name)
self.hooks[when][op_mode]['list'].append(name)
class RenderInfo: def __init__(self, cw, family, ku_space, op, op_mode, attr_set=None):
self.family = family
self.nl = cw.nlib
self.ku_space = ku_space
self.op_mode = op_mode
self.op = op
fixed_hdr = op.fixed_header if op elseNone
self.fixed_hdr_len = 'ys->family->hdr_len' if op and op.fixed_header: if op.fixed_header != family.fixed_header: if family.is_classic():
self.fixed_hdr_len = f"sizeof(struct {c_lower(fixed_hdr)})" else: raise Exception(f"Per-op fixed header not supported, yet")
# 'do' and 'dump' response parsing is identical
self.type_consistent = True
self.type_oneside = False if op_mode != 'do'and'dump'in op: if'do'in op: if ('reply'in op['do']) != ('reply'in op["dump"]):
self.type_consistent = False elif'reply'in op['do'] and op["do"]["reply"] != op["dump"]["reply"]:
self.type_consistent = False else:
self.type_consistent = True
self.type_oneside = True
def close_out_file(self): if self._out == os.sys.stdout: return # Avoid modifying the file if contents didn't change
self._out.flush() ifnot self._overwrite and os.path.isfile(self._out_file): if filecmp.cmp(self._out.name, self._out_file, shallow=False): return with open(self._out_file, 'w+') as out_file:
self._out.seek(0)
shutil.copyfileobj(self._out, out_file)
self._out.close()
self._out = os.sys.stdout
@classmethod def _is_cond(cls, line): return line.startswith('if') or line.startswith('while') or line.startswith('for')
def p(self, line, add_ind=0): if self._block_end:
self._block_end = False if line.startswith('else'):
line = '} ' + line else:
self._out.write('\t' * self._ind + '}\n')
if self._nl:
self._out.write('\n')
self._nl = False
ind = self._ind if line[-1] == ':':
ind -= 1 if self._silent_block:
ind += 1
self._silent_block = line.endswith(')') and CodeWriter._is_cond(line)
self._silent_block |= line.strip() == 'else' if line[0] == '#':
ind = 0 if add_ind:
ind += add_ind
self._out.write('\t' * ind + line + '\n')
def nl(self):
self._nl = True
def block_start(self, line=''): if line:
line = line + ' '
self.p(line + '{')
self._ind += 1
def block_end(self, line=''): if line and line[0] notin {';', ','}:
line = ' ' + line
self._ind -= 1
self._nl = False ifnot line: # Delay printing closing bracket in case "else" comes next if self._block_end:
self._out.write('\t' * (self._ind + 1) + '}\n')
self._block_end = True else:
self.p('}' + line)
def write_doc_line(self, doc, indent=True):
words = doc.split()
line = ' *' for word in words: if len(line) + len(word) >= 79:
self.p(line)
line = ' *' if indent:
line += ' '
line += ' ' + word
self.p(line)
def writes_defines(self, defines):
longest = 0 for define in defines: if len(define[0]) > longest:
longest = len(define[0])
longest = ((longest + 8) // 8) * 8 for define in defines:
line = '#define ' + define[0]
line += '\t' * ((longest - len(define[0]) + 7) // 8) if type(define[1]) is int:
line += str(define[1]) elif type(define[1]) is str:
line += '"' + define[1] + '"'
self.p(line)
def write_struct_init(self, members):
longest = max([len(x[0]) for x in members])
longest += 1 # because we prepend a .
longest = ((longest + 8) // 8) * 8 for one in members:
line = '.' + one[0]
line += '\t' * ((longest - len(one[0]) - 1 + 7) // 8)
line += '= ' + str(one[1]) + ','
self.p(line)
def ifdef_block(self, config):
config_option = None if config:
config_option = 'CONFIG_' + c_upper(config) if self._ifdef_block == config_option: return
if self._ifdef_block:
self.p('#endif /* ' + self._ifdef_block + ' */') if config_option:
self.p('#ifdef ' + config_option)
self._ifdef_block = config_option
def put_op_name(family, cw):
map_name = f'{family.c_name}_op_strmap'
cw.block_start(line=f"static const char * const {map_name}[] =") for op_name, op in family.msgs.items(): if op.rsp_value: # Make sure we don't add duplicated entries, if multiple commands # produce the same response in legacy families. if family.rsp_by_value[op.rsp_value] != op:
cw.p(f'// skip "{op_name}", duplicate reply value') continue
for arg in struct.inherited:
ri.cw.p(f'dst->{arg} = {arg};')
if struct.fixed_header: if struct.nested:
ri.cw.p('hdr = ynl_attr_data(nested);') elif ri.family.is_classic():
ri.cw.p('hdr = ynl_nlmsg_data(nlh);') else:
ri.cw.p('hdr = ynl_nlmsg_data_offset(nlh, sizeof(struct genlmsghdr));')
ri.cw.p(f"memcpy(&dst->_hdr, hdr, sizeof({struct.fixed_header}));") for anest in sorted(all_multi):
aspec = struct[anest]
ri.cw.p(f"if (dst->{aspec.c_name})")
ri.cw.p(f'return ynl_error_parse(yarg, "attribute already present ({struct.attr_set.name}.{aspec.name})");')
ri.cw.nl()
ri.cw.block_start(line=iter_line)
ri.cw.p('unsigned int type = ynl_attr_type(attr);')
ri.cw.nl()
first = True for _, arg in struct.member_list():
good = arg.attr_get(ri, 'dst', first=first) # First may be 'unused' or 'pad', ignore those
first &= not good
ri.cw.block_end()
ri.cw.nl()
for anest in sorted(array_nests):
aspec = struct[anest]
first = True for name, arg in struct.member_list():
kw = 'if'if first else'else if'
first = False
ri.cw.block_start(line=f'{kw} (!strcmp(sel, "{name}"))')
get_lines, init_lines, _ = arg._attr_get(ri, var) for line in init_lines or []:
ri.cw.p(line) for line in get_lines:
ri.cw.p(line) if arg.presence_type() == 'present':
ri.cw.p(f"{var}->_present.{arg.c_name} = 1;")
ri.cw.block_end()
ri.cw.p('return 0;')
ri.cw.block_end()
ri.cw.nl()
def parse_rsp_nested_prototype(ri, struct, suffix=';'):
func_args = ['struct ynl_parse_arg *yarg', 'const struct nlattr *nested'] for sel in struct.external_selectors():
func_args.append('const char *_sel_' + sel.name) if struct.submsg:
func_args.insert(1, 'const char *sel') for arg in struct.inherited:
func_args.append('__u32 ' + arg)
if ri.needs_nlflags(direction):
ri.cw.p('__u16 _nlmsg_flags;')
ri.cw.nl() if struct.fixed_header:
ri.cw.p(struct.fixed_header + ' _hdr;')
ri.cw.nl()
for type_filter in ['present', 'len', 'count']:
meta_started = False for _, attr in struct.member_list():
line = attr.presence_member(ri.ku_space, type_filter) if line: ifnot meta_started:
ri.cw.block_start(line=f"struct")
meta_started = True
ri.cw.p(line) if meta_started:
ri.cw.block_end(line=f'_{type_filter};')
ri.cw.nl()
for arg in struct.inherited:
ri.cw.p(f"__u32 {arg};")
for _, attr in struct.member_list():
attr.struct_member(ri)
if struct.request and struct.in_multi_val:
print_alloc_wrapper(ri, "", struct)
ri.cw.nl()
free_rsp_nested_prototype(ri)
ri.cw.nl()
# Name conflicts are too hard to deal with with the current code base, # they are very rare so don't bother printing setters in that case. if ri.ku_space == 'user'andnot ri.type_name_conflict: for _, attr in struct.member_list():
attr.setter(ri, ri.attr_set, "", var="obj")
ri.cw.nl()
if ri.needs_nlflags(direction):
print_nlflags_set(ri, direction)
if ri.ku_space == 'user'and direction == 'request': for _, attr in ri.struct[direction].member_list():
attr.setter(ri, ri.attr_set, direction, deref=deref)
ri.cw.nl()
def print_req_type_helpers(ri): if ri.type_empty("request"): return
print_alloc_wrapper(ri, "request")
print_type_helpers(ri, "request")
def print_req_policy_fwd(cw, struct, ri=None, terminate=True): if terminate and ri and policy_should_be_static(struct.family): return
if terminate:
prefix = 'extern ' else: if ri and policy_should_be_static(struct.family):
prefix = 'static ' else:
prefix = ''
suffix = ';'if terminate else' = {'
max_attr = struct.attr_max_val if ri:
name = ri.op.render_name if ri.op.dual_policy:
name += '_' + ri.op_mode else:
name = struct.render_name
cw.p(f"{prefix}const struct nla_policy {name}_nl_policy[{max_attr.enum_name} + 1]{suffix}")
def print_req_policy(cw, struct, ri=None): if ri and ri.op:
cw.ifdef_block(ri.op.get('config-cond', None))
print_req_policy_fwd(cw, struct, ri=ri, terminate=False) for _, arg in struct.member_list():
arg.attr_policy(cw)
cw.p("};")
cw.ifdef_block(None)
cw.nl()
ifnot exported:
cnt = "" elif family.kernel_policy == 'split':
cnt = 0 for op in family.ops.values(): if'do'in op:
cnt += 1 if'dump'in op:
cnt += 1 else:
cnt = len(family.ops)
qual = 'static const'ifnot exported else'const'
line = f"{qual} struct {struct_type} {family.c_name}_nl_ops[{cnt}]" if terminate:
cw.p(f"extern {line};") else:
cw.block_start(line=line + ' =')
ifnot terminate: return
cw.nl() for name in family.hooks['pre']['do']['list']:
cw.write_func_prot('int', c_lower(name),
['const struct genl_split_ops *ops', 'struct sk_buff *skb', 'struct genl_info *info'], suffix=';') for name in family.hooks['post']['do']['list']:
cw.write_func_prot('void', c_lower(name),
['const struct genl_split_ops *ops', 'struct sk_buff *skb', 'struct genl_info *info'], suffix=';') for name in family.hooks['pre']['dump']['list']:
cw.write_func_prot('int', c_lower(name),
['struct netlink_callback *cb'], suffix=';') for name in family.hooks['post']['dump']['list']:
cw.write_func_prot('int', c_lower(name),
['struct netlink_callback *cb'], suffix=';')
cw.nl()
for op_name, op in family.ops.items(): if op.is_async: continue
def print_kernel_op_table(family, cw):
print_kernel_op_table_fwd(family, cw, terminate=False) if family.kernel_policy == 'global'or family.kernel_policy == 'per-op': for op_name, op in family.ops.items(): if op.is_async: continue
cw.ifdef_block(op.get('config-cond', None))
cw.block_start()
members = [('cmd', op.enum_name)] if'dont-validate'in op:
members.append(('validate', ' | '.join([c_upper('genl-dont-validate-' + x) for x in op['dont-validate']])), ) for op_mode in ['do', 'dump']: if op_mode in op:
name = c_lower(f"{family.ident_name}-nl-{op_name}-{op_mode}it")
members.append((op_mode + 'it', name)) if family.kernel_policy == 'per-op':
struct = Struct(family, op['attribute-set'],
type_list=op['do']['request']['attributes'])
name = c_lower(f"{family.ident_name}-{op_name}-nl-policy")
members.append(('policy', name))
members.append(('maxattr', struct.attr_max_val.enum_name)) if'flags'in op:
members.append(('flags', ' | '.join([c_upper('genl-' + x) for x in op['flags']])))
cw.write_struct_init(members)
cw.block_end(line=',') elif family.kernel_policy == 'split':
cb_names = {'do': {'pre': 'pre_doit', 'post': 'post_doit'}, 'dump': {'pre': 'start', 'post': 'done'}}
for op_name, op in family.ops.items(): for op_mode in ['do', 'dump']: if op.is_async or op_mode notin op: continue
cw.ifdef_block(op.get('config-cond', None))
cw.block_start()
members = [('cmd', op.enum_name)] if'dont-validate'in op:
dont_validate = [] for x in op['dont-validate']: if op_mode == 'do'and x in ['dump', 'dump-strict']: continue if op_mode == "dump"and x == 'strict': continue
dont_validate.append(x)
if dont_validate:
members.append(('validate', ' | '.join([c_upper('genl-dont-validate-' + x) for x in dont_validate])), )
name = c_lower(f"{family.ident_name}-nl-{op_name}-{op_mode}it") if'pre'in op[op_mode]:
members.append((cb_names[op_mode]['pre'], c_lower(op[op_mode]['pre'])))
members.append((op_mode + 'it', name)) if'post'in op[op_mode]:
members.append((cb_names[op_mode]['post'], c_lower(op[op_mode]['post']))) if'request'in op[op_mode]:
struct = Struct(family, op['attribute-set'],
type_list=op[op_mode]['request']['attributes'])
if op.dual_policy:
name = c_lower(f"{family.ident_name}-{op_name}-{op_mode}-nl-policy") else:
name = c_lower(f"{family.ident_name}-{op_name}-nl-policy")
members.append(('policy', name))
members.append(('maxattr', struct.attr_max_val.enum_name))
flags = (op['flags'] if'flags'in op else []) + ['cmd-cap-' + op_mode]
members.append(('flags', ' | '.join([c_upper('genl-' + x) for x in flags])))
cw.write_struct_init(members)
cw.block_end(line=',')
cw.ifdef_block(None)
uapi_enum_start(family, cw, family['operations'], 'enum-name')
val = 0 for op in family.msgs.values(): if separate_ntf and ('notify'in op or'event'in op): continue
suffix = ',' if op.value != val:
suffix = f" = {op.value},"
val = op.value
cw.p(op.enum_name + suffix)
val += 1
cw.nl()
cw.p(cnt_name + (''if max_by_define else',')) ifnot max_by_define:
cw.p(f"{max_name} = {max_value}")
cw.block_end(line=';') if max_by_define:
cw.p(f"#define {max_name} {max_value}")
cw.nl()
cw.block_start(line='enum')
cw.p(c_upper(f'{family.name}_MSG_KERNEL_NONE = 0,'))
val = 0 for op in family.msgs.values(): if ('do'in op and'reply'in op['do']) or'notify'in op or'event'in op:
enum_name = op.enum_name if'event'notin op and'notify'notin op:
enum_name = f'{enum_name}_REPLY'
suffix = ',' if op.value and op.value != val:
suffix = f" = {op.value},"
val = op.value
cw.p(enum_name + suffix)
val += 1
cw.nl()
cw.p(cnt_name + (''if max_by_define else',')) ifnot max_by_define:
cw.p(f"{max_name} = {max_value}")
cw.block_end(line=';') if max_by_define:
cw.p(f"#define {max_name} {max_value}")
cw.nl()
defines = [] for const in family['definitions']: if const.get('header'): continue
if const['type'] != 'const':
cw.writes_defines(defines)
defines = []
cw.nl()
# Write kdoc for enum and flags (one day maybe also structs) if const['type'] == 'enum'or const['type'] == 'flags':
enum = family.consts[const['name']]
if enum.header: continue
if enum.has_doc(): if enum.has_entry_doc():
cw.p('/**')
doc = '' if'doc'in enum:
doc = ' - ' + enum['doc']
cw.write_doc_line(enum.enum_name + doc) else:
cw.p('/*')
cw.write_doc_line(enum['doc'], indent=False) for entry in enum.entries.values(): if entry.has_doc():
doc = '@' + entry.c_name + ': ' + entry['doc']
cw.write_doc_line(doc)
cw.p(' */')
uapi_enum_start(family, cw, const, 'name')
name_pfx = const.get('name-prefix', f"{family.ident_name}-{const['name']}-") for entry in enum.entries.values():
suffix = ',' if entry.value_change:
suffix = f" = {entry.user_value()}" + suffix
cw.p(entry.c_name + suffix)
if separate_ntf:
uapi_enum_start(family, cw, family['operations'], enum_name='async-enum') for op in family.msgs.values(): if separate_ntf andnot ('notify'in op or'event'in op): continue
# Multicast
defines = [] for grp in family.mcgrps['list']:
name = grp['name']
defines.append([c_upper(grp.get('c-define-name', f"{family.ident_name}-mcgrp-{name}")),
f'{name}'])
cw.nl() if defines:
cw.writes_defines(defines)
cw.nl()
def render_user_family(family, cw, prototype):
symbol = f'const struct ynl_family ynl_{family.c_name}_family' if prototype:
cw.p(f'extern {symbol};') return
if family.ntfs:
cw.block_start(line=f"static const struct ynl_ntf_info {family.c_name}_ntf_info[] = ") for ntf_op_name, ntf_op in family.ntfs.items(): if'notify'in ntf_op:
op = family.ops[ntf_op['notify']]
ri = RenderInfo(cw, family, "user", op, "notify") elif'event'in ntf_op:
ri = RenderInfo(cw, family, "user", ntf_op, "event") else: raise Exception('Invalid notification ' + ntf_op_name)
_render_user_ntf_entry(ri, ntf_op) for op_name, op in family.ops.items(): if'event'notin op: continue
ri = RenderInfo(cw, family, "user", op, "event")
_render_user_ntf_entry(ri, op)
cw.block_end(line=";")
cw.nl()
cw.block_start(f'{symbol} = ')
cw.p(f'.name\t\t= "{family.c_name}",') if family.is_classic():
cw.p(f'.is_classic\t= true,')
cw.p(f'.classic_id\t= {family.get("protonum")},') if family.is_classic(): if family.fixed_header:
cw.p(f'.hdr_len\t= sizeof(struct {c_lower(family.fixed_header)}),') elif family.fixed_header:
cw.p(f'.hdr_len\t= sizeof(struct genlmsghdr) + sizeof(struct {c_lower(family.fixed_header)}),') else:
cw.p('.hdr_len\t= sizeof(struct genlmsghdr),') if family.ntfs:
cw.p(f".ntf_info\t= {family.c_name}_ntf_info,")
cw.p(f".ntf_info_size\t= YNL_ARRAY_SIZE({family.c_name}_ntf_info),")
cw.block_end(line=';')
def family_contains_bitfield32(family): for _, attr_set in family.attr_sets.items(): if attr_set.subset_of: continue for _, attr in attr_set.items(): if attr.type == "bitfield32": returnTrue returnFalse
def main():
parser = argparse.ArgumentParser(description='Netlink simple parsing generator')
parser.add_argument('--mode', dest='mode', type=str, required=True,
choices=('user', 'kernel', 'uapi'))
parser.add_argument('--spec', dest='spec', type=str, required=True)
parser.add_argument('--header', dest='header', action='store_true', default=None)
parser.add_argument('--source', dest='header', action='store_false')
parser.add_argument('--user-header', nargs='+', default=[])
parser.add_argument('--cmp-out', action='store_true', default=None,
help='Do not overwrite the output file if the new output is identical to the old')
parser.add_argument('--exclude-op', action='append', default=[])
parser.add_argument('-o', dest='out_file', type=str, default=None)
args = parser.parse_args()
if args.header isNone:
parser.error("--header or --source is required")
exclude_ops = [re.compile(expr) for expr in args.exclude_op]
try:
parsed = Family(args.spec, exclude_ops) if parsed.license != '((GPL-2.0 WITH Linux-syscall-note) OR BSD-3-Clause)':
print('Spec license:', parsed.license)
print('License must be: ((GPL-2.0 WITH Linux-syscall-note) OR BSD-3-Clause)')
os.sys.exit(1) except yaml.YAMLError as exc:
print(exc)
os.sys.exit(1) return
if args.mode == 'kernel':
cw.p('#include <net/netlink.h>')
cw.p('#include <net/genetlink.h>')
cw.nl() ifnot args.header: if args.out_file:
cw.p(f'#include "{hdr_file}"')
cw.nl()
headers = ['uapi/' + parsed.uapi_header]
headers += parsed.kernel_family.get('headers', []) else:
cw.p('#include <stdlib.h>')
cw.p('#include <string.h>') if args.header:
cw.p('#include <linux/types.h>') if family_contains_bitfield32(parsed):
cw.p('#include <linux/netlink.h>') else:
cw.p(f'#include "{hdr_file}"')
cw.p('#include "ynl.h"')
headers = [] for definition in parsed['definitions'] + parsed['attribute-sets']: if'header'in definition:
headers.append(definition['header']) if args.mode == 'user':
headers.append(parsed.uapi_header)
seen_header = [] for one in headers: if one notin seen_header:
cw.p(f"#include <{one}>")
seen_header.append(one)
cw.nl()
if args.mode == "user": ifnot args.header:
cw.p("#include <linux/genetlink.h>")
cw.nl() for one in args.user_header:
cw.p(f'#include "{one}"') else:
cw.p('struct ynl_sock;')
cw.nl()
render_user_family(parsed, cw, True)
cw.nl()
if args.mode == "kernel": if args.header: for _, struct in sorted(parsed.pure_nested_structs.items()): if struct.request:
cw.p('/* Common nested types */') break for attr_set, struct in sorted(parsed.pure_nested_structs.items()): if struct.request:
print_req_policy_fwd(cw, struct)
cw.nl()
if parsed.kernel_policy == 'global':
cw.p(f"/* Global operation policy for {parsed.name} */")
if parsed.kernel_policy in {'per-op', 'split'}: for op_name, op in parsed.ops.items(): if'do'in op and'event'notin op:
ri = RenderInfo(cw, parsed, args.mode, op, "do")
print_req_policy_fwd(cw, ri.struct['request'], ri=ri)
cw.nl()
for _, struct in sorted(parsed.pure_nested_structs.items()): if struct.request:
cw.p('/* Common nested types */') break for attr_set, struct in sorted(parsed.pure_nested_structs.items()): if struct.request:
print_req_policy(cw, struct)
cw.nl()
if parsed.kernel_policy == 'global':
cw.p(f"/* Global operation policy for {parsed.name} */")
for op_name, op in parsed.ops.items(): if parsed.kernel_policy in {'per-op', 'split'}: for op_mode in ['do', 'dump']: if op_mode in op and'request'in op[op_mode]:
cw.p(f"/* {op.enum_name} - {op_mode} */")
ri = RenderInfo(cw, parsed, args.mode, op, op_mode)
print_req_policy(cw, ri.struct['request'], ri=ri)
cw.nl()
if args.mode == "user": if args.header:
cw.p('/* Enums */')
put_op_name_fwd(parsed, cw)
for name, const in parsed.consts.items(): if isinstance(const, EnumSet):
put_enum_to_str_fwd(parsed, cw, const)
cw.nl()
cw.p('/* Common nested types */') for attr_set, struct in parsed.pure_nested_structs.items():
ri = RenderInfo(cw, parsed, args.mode, "", "", attr_set)
print_type_full(ri, struct)
for op_name, op in parsed.ops.items():
cw.p(f"/* ============== {op.enum_name} ============== */")
if'do'in op and'event'notin op:
cw.p(f"/* {op.enum_name} - do */")
ri = RenderInfo(cw, parsed, args.mode, op, "do")
print_req_type(ri)
print_req_type_helpers(ri)
cw.nl()
print_rsp_type(ri)
print_rsp_type_helpers(ri)
cw.nl()
print_req_prototype(ri)
cw.nl()
if op.has_ntf:
cw.p(f"/* {op.enum_name} - notify */")
ri = RenderInfo(cw, parsed, args.mode, op, 'notify') ifnot ri.type_consistent: raise Exception(f'Only notifications with consistent types supported ({op.name})')
print_wrapped_type(ri)
for op_name, op in parsed.ntfs.items(): if'event'in op:
ri = RenderInfo(cw, parsed, args.mode, op, 'event')
cw.p(f"/* {op.enum_name} - event */")
print_rsp_type(ri)
cw.nl()
print_wrapped_type(ri)
cw.nl() else:
cw.p('/* Enums */')
put_op_name(parsed, cw)
for name, const in parsed.consts.items(): if isinstance(const, EnumSet):
put_enum_to_str(parsed, cw, const)
cw.nl()
has_recursive_nests = False
cw.p('/* Policies */') for struct in parsed.pure_nested_structs.values(): if struct.recursive:
put_typol_fwd(cw, struct)
has_recursive_nests = True if has_recursive_nests:
cw.nl() for struct in parsed.pure_nested_structs.values():
put_typol(cw, struct) for name in parsed.root_sets:
struct = Struct(parsed, name)
put_typol(cw, struct)
cw.p('/* Common nested types */') if has_recursive_nests: for attr_set, struct in parsed.pure_nested_structs.items():
ri = RenderInfo(cw, parsed, args.mode, "", "", attr_set)
free_rsp_nested_prototype(ri) if struct.request:
put_req_nested_prototype(ri, struct) if struct.reply:
parse_rsp_nested_prototype(ri, struct)
cw.nl() for attr_set, struct in parsed.pure_nested_structs.items():
ri = RenderInfo(cw, parsed, args.mode, "", "", attr_set)
free_rsp_nested(ri, struct) if struct.request:
put_req_nested(ri, struct) if struct.reply:
parse_rsp_nested(ri, struct)
for op_name, op in parsed.ops.items():
cw.p(f"/* ============== {op.enum_name} ============== */") if'do'in op and'event'notin op:
cw.p(f"/* {op.enum_name} - do */")
ri = RenderInfo(cw, parsed, args.mode, op, "do")
print_req_free(ri)
print_rsp_free(ri)
parse_rsp_msg(ri)
print_req(ri)
cw.nl()
¤ Diese beiden folgenden Angebotsgruppen bietet das Unternehmen0.69Angebot
(Wie Sie bei der Firma Beratungs- und Dienstleistungen beauftragen können 2026-04-29)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.