Compare commits

...

2 Commits

Author SHA1 Message Date
bf5e55a08a add table name 2024-01-16 17:35:16 +01:00
f8a8e38ff2 update for nftables 1.0.6 2024-01-16 14:43:22 +01:00
4 changed files with 43 additions and 25 deletions

View File

@ -86,19 +86,27 @@ You need at least one section to make this program work. These sections cannot b
- `set_name` (str) - `set_name` (str)
This is the Set Name you want to work with This is the Set Name you want to work with
- `enable` (bool) - `enable` (bool)
no Default no Default
You need to specify if you want to activate this rule with `True` or keep it disabled with `False` You need to specify if you want to activate this rule with `True` or keep it disabled with `False`
- `typeof` (str) - `family` (str)
no Default no Default
Choose between `ipv4` or `ipv6`, cannot be both since nftables set are simple stack. Choose between `ip`, `ip6` and `inet`. This specifie the nftables "address family" used by the filter.
- `table` (str)
Default: `filter`
Tables can be specified to be in output, forward, or custom tables. The default is `filter` because it's the default table name on the debian package.
- `domains` (str) - `domains` (str)
no Default no Default
This is the domain (fqdn) you want to resolve and added to your set. This is the domain (fqdn) you want to resolve and added to your set.

View File

@ -6,8 +6,10 @@ from typing import List
class ModelEntry(BaseModel): class ModelEntry(BaseModel):
set_name: str set_name: str
typeof: int
fqdn: str fqdn: str
family: str
typeof: int
table: str
ip_list: List[IPvAnyAddress] | None ip_list: List[IPvAnyAddress] | None
ttl: int | None ttl: int | None
next_update: datetime.datetime | None next_update: datetime.datetime | None

View File

@ -10,11 +10,13 @@ include_config_dir = /etc/nft-dns.d/
#[debian] #[debian]
#set_name = ALLOW-DNS #set_name = ALLOW-DNS
#enable = true #enable = true
#typeof = ipv4 #family=ip
#table=filter
#domains = deb.debian.org, security.debian.org #domains = deb.debian.org, security.debian.org
#[debian6] #[debian6]
#set_name = ALLOW-DNS-6 #set_name = ALLOW-DNS-6
#enable = true #enable = true
#typeof = ipv6 #family=ip6
#table=filter
#domains = deb.debian.org, security.debian.org #domains = deb.debian.org, security.debian.org

View File

@ -48,16 +48,31 @@ def read_config():
for section in config.sections(): for section in config.sections():
if section != 'GLOBAL' and config[section].getboolean('enable', fallback=False): if section != 'GLOBAL' and config[section].getboolean('enable', fallback=False):
for fqdn in config[section]["domains"].split(','): for fqdn in config[section]["domains"].split(','):
if config[section]["typeof"] == "ipv4": if config[section]["family"] in ['ip', 'ip6', 'inet']:
type_of = 4 family = config[section]["family"]
elif config[section]["typeof"] == "ipv6":
type_of = 6
else: else:
print("Erreur de config") print(f"Erreur de config, family of {fqdn} not : ip, ip6 or inet")
exit(1) exit(1)
table = config[section].get('table', fallback='filter')
res = run_command(f"nft list set {family} {table} {config[section]['set_name']}")
typeof = 4
if not (args.dry_run or (config.has_option('GLOBAL', 'verbose') and config['GLOBAL'].getboolean('dry_run', fallback=False))):
if "type ipv4_addr" in res:
typeof = 4
logging.debug(f"set {config[section]['set_name']} well defined in ipv4_addr family")
elif "type ipv6_addr" in res:
typeof = 6
logging.debug(f"set {config[section]['set_name']} well defined in ipv6_addr family")
else:
logging.error(f"Type of the {config[section]['set_name']} set not defined to \"ipv4_addr\" or \"ipv6_addr\" into the nftables set. Only theses type are allowed.")
exit(1)
else:
logging.info('The dry_run option force the typeof to "ipv4" since not command are executed to check that')
result = entry.ModelEntry( result = entry.ModelEntry(
set_name=config[section]["set_name"], set_name=config[section]["set_name"],
typeof=type_of, family=family,
table=table,
typeof=typeof,
fqdn=fqdn.strip(), fqdn=fqdn.strip(),
ip_list=None, ip_list=None,
ttl=None, ttl=None,
@ -68,15 +83,6 @@ def read_config():
if len(values) == 0: if len(values) == 0:
logging.error("No entries configurated, I've nothing to do, Exiting in tears...") logging.error("No entries configurated, I've nothing to do, Exiting in tears...")
exit(1) exit(1)
list_set = list(set([i.set_name for i in values])) # get all nft named set once
for set_name in list_set:
res = run_command(f"nft list set filter {set_name}")
if not (args.dry_run or (config.has_option('GLOBAL', 'verbose') and config['GLOBAL'].getboolean('dry_run', fallback=False))):
if "ipv4_addr" in res or "ipv6_addr" in res:
logging.debug(f"set {set_name} well defined")
else:
logging.error(f'Type of the {set_name} set, not defined on "ipv4_addr" or "ipv6_addr"')
exit(1)
logging.info("# End of Parsing") logging.info("# End of Parsing")
@ -114,7 +120,7 @@ def update_dns() -> None:
logging.info(f"Updating the IPv{i.typeof} for {i.fqdn} with {i.ip_list}") logging.info(f"Updating the IPv{i.typeof} for {i.fqdn} with {i.ip_list}")
apply_config_entry(i, old_ip_list=old_ip_list) apply_config_entry(i, old_ip_list=old_ip_list)
else: else:
logging.info(f"Nothing have change for the IPv{i.typeof} for {i.fqdn}") logging.debug(f"Nothing have change for the IPv{i.typeof} for {i.fqdn}")
values = [i for i in values if i.ip_list is not None] values = [i for i in values if i.ip_list is not None]
@ -124,16 +130,16 @@ def get_next_run_timer() -> datetime:
def apply_config_entry(one_entry: entry.ModelEntry, old_ip_list: List[IPvAnyAddress] | None) -> None: def apply_config_entry(one_entry: entry.ModelEntry, old_ip_list: List[IPvAnyAddress] | None) -> None:
if old_ip_list: if old_ip_list:
run_command(f"nft delete element filter {one_entry.set_name} {{{', '.join([str(ip) for ip in old_ip_list])}}}") run_command(f"nft delete element {one_entry.family} {one_entry.table} {one_entry.set_name} {{{', '.join([str(ip) for ip in old_ip_list])}}}")
if one_entry.ip_list: if one_entry.ip_list:
run_command(f"nft add element filter {one_entry.set_name} {{{', '.join([str(ip) for ip in one_entry.ip_list])}}}") run_command(f"nft add element {one_entry.family} {one_entry.table} {one_entry.set_name} {{{', '.join([str(ip) for ip in one_entry.ip_list])}}}")
def remove_config_entries(): def remove_config_entries():
logging.info("Cleaning all entries") logging.info("Cleaning all entries")
for i in values: for i in values:
run_command(f"nft delete element filter {i.set_name} {{{', '.join([str(ip) for ip in i.ip_list])}}}") run_command(f"nft delete element {i.family} {i.table} {i.set_name} {{{', '.join([str(ip) for ip in i.ip_list])}}}")
def run_command(cmd: str) -> str: def run_command(cmd: str) -> str:
@ -149,7 +155,7 @@ def run_command(cmd: str) -> str:
logging.error("The nft command isn't found, Run with --dry-run to avoid nftable change tries") logging.error("The nft command isn't found, Run with --dry-run to avoid nftable change tries")
exit(1) exit(1)
else: else:
logging.debug("Dry-run detected, logging only") logging.debug("Dry-run detected, logging only, the previous command isn't executed")
def run_loop(): def run_loop():