Compare commits

...

2 Commits

Author SHA1 Message Date
Azlux bf5e55a08a add table name 2024-01-16 17:35:16 +01:00
Azlux f8a8e38ff2 update for nftables 1.0.6 2024-01-16 14:43:22 +01:00
4 changed files with 43 additions and 25 deletions

View File

@ -86,19 +86,27 @@ You need at least one section to make this program work. These sections cannot b
- `set_name` (str)
This is the Set Name you want to work with
- `enable` (bool)
no Default
You need to specify if you want to activate this rule with `True` or keep it disabled with `False`
- `typeof` (str)
- `family` (str)
no Default
Choose between `ipv4` or `ipv6`, cannot be both since nftables set are simple stack.
Choose between `ip`, `ip6` and `inet`. This specifie the nftables "address family" used by the filter.
- `table` (str)
Default: `filter`
Tables can be specified to be in output, forward, or custom tables. The default is `filter` because it's the default table name on the debian package.
- `domains` (str)
no Default
This is the domain (fqdn) you want to resolve and added to your set.

View File

@ -6,8 +6,10 @@ from typing import List
class ModelEntry(BaseModel):
set_name: str
typeof: int
fqdn: str
family: str
typeof: int
table: str
ip_list: List[IPvAnyAddress] | None
ttl: int | None
next_update: datetime.datetime | None

View File

@ -10,11 +10,13 @@ include_config_dir = /etc/nft-dns.d/
#[debian]
#set_name = ALLOW-DNS
#enable = true
#typeof = ipv4
#family=ip
#table=filter
#domains = deb.debian.org, security.debian.org
#[debian6]
#set_name = ALLOW-DNS-6
#enable = true
#typeof = ipv6
#family=ip6
#table=filter
#domains = deb.debian.org, security.debian.org

View File

@ -48,16 +48,31 @@ def read_config():
for section in config.sections():
if section != 'GLOBAL' and config[section].getboolean('enable', fallback=False):
for fqdn in config[section]["domains"].split(','):
if config[section]["typeof"] == "ipv4":
type_of = 4
elif config[section]["typeof"] == "ipv6":
type_of = 6
if config[section]["family"] in ['ip', 'ip6', 'inet']:
family = config[section]["family"]
else:
print("Erreur de config")
print(f"Erreur de config, family of {fqdn} not : ip, ip6 or inet")
exit(1)
table = config[section].get('table', fallback='filter')
res = run_command(f"nft list set {family} {table} {config[section]['set_name']}")
typeof = 4
if not (args.dry_run or (config.has_option('GLOBAL', 'verbose') and config['GLOBAL'].getboolean('dry_run', fallback=False))):
if "type ipv4_addr" in res:
typeof = 4
logging.debug(f"set {config[section]['set_name']} well defined in ipv4_addr family")
elif "type ipv6_addr" in res:
typeof = 6
logging.debug(f"set {config[section]['set_name']} well defined in ipv6_addr family")
else:
logging.error(f"Type of the {config[section]['set_name']} set not defined to \"ipv4_addr\" or \"ipv6_addr\" into the nftables set. Only theses type are allowed.")
exit(1)
else:
logging.info('The dry_run option force the typeof to "ipv4" since not command are executed to check that')
result = entry.ModelEntry(
set_name=config[section]["set_name"],
typeof=type_of,
family=family,
table=table,
typeof=typeof,
fqdn=fqdn.strip(),
ip_list=None,
ttl=None,
@ -68,15 +83,6 @@ def read_config():
if len(values) == 0:
logging.error("No entries configurated, I've nothing to do, Exiting in tears...")
exit(1)
list_set = list(set([i.set_name for i in values])) # get all nft named set once
for set_name in list_set:
res = run_command(f"nft list set filter {set_name}")
if not (args.dry_run or (config.has_option('GLOBAL', 'verbose') and config['GLOBAL'].getboolean('dry_run', fallback=False))):
if "ipv4_addr" in res or "ipv6_addr" in res:
logging.debug(f"set {set_name} well defined")
else:
logging.error(f'Type of the {set_name} set, not defined on "ipv4_addr" or "ipv6_addr"')
exit(1)
logging.info("# End of Parsing")
@ -114,7 +120,7 @@ def update_dns() -> None:
logging.info(f"Updating the IPv{i.typeof} for {i.fqdn} with {i.ip_list}")
apply_config_entry(i, old_ip_list=old_ip_list)
else:
logging.info(f"Nothing have change for the IPv{i.typeof} for {i.fqdn}")
logging.debug(f"Nothing have change for the IPv{i.typeof} for {i.fqdn}")
values = [i for i in values if i.ip_list is not None]
@ -124,16 +130,16 @@ def get_next_run_timer() -> datetime:
def apply_config_entry(one_entry: entry.ModelEntry, old_ip_list: List[IPvAnyAddress] | None) -> None:
if old_ip_list:
run_command(f"nft delete element filter {one_entry.set_name} {{{', '.join([str(ip) for ip in old_ip_list])}}}")
run_command(f"nft delete element {one_entry.family} {one_entry.table} {one_entry.set_name} {{{', '.join([str(ip) for ip in old_ip_list])}}}")
if one_entry.ip_list:
run_command(f"nft add element filter {one_entry.set_name} {{{', '.join([str(ip) for ip in one_entry.ip_list])}}}")
run_command(f"nft add element {one_entry.family} {one_entry.table} {one_entry.set_name} {{{', '.join([str(ip) for ip in one_entry.ip_list])}}}")
def remove_config_entries():
logging.info("Cleaning all entries")
for i in values:
run_command(f"nft delete element filter {i.set_name} {{{', '.join([str(ip) for ip in i.ip_list])}}}")
run_command(f"nft delete element {i.family} {i.table} {i.set_name} {{{', '.join([str(ip) for ip in i.ip_list])}}}")
def run_command(cmd: str) -> str:
@ -149,7 +155,7 @@ def run_command(cmd: str) -> str:
logging.error("The nft command isn't found, Run with --dry-run to avoid nftable change tries")
exit(1)
else:
logging.debug("Dry-run detected, logging only")
logging.debug("Dry-run detected, logging only, the previous command isn't executed")
def run_loop():