@ -27,7 +27,7 @@ from prettytable import PrettyTable
from pymysql import Connection
Revisions = Dict[str, Dict[int, str]]
DBSchema = Dict[int, List[str]]
DBSchema = Dict[int, Dict[int, List[str] ]]
ConfigDict = Dict[str, str | int | bool]
@ -847,11 +847,11 @@ def get_node_roles() -> ConfigDict:
f'[{default_section}]\n{file.read()}'
)
for k, v in roles_config[default_section].items():
m = re.search('ngcp_is_(\\w+)', k)
if m:
val = True if v == '"yes"' else False
node_roles[m.group(1)] = val
elif k == 'ngcp_type' :
if v == '"yes"' or v == '"no"':
node_roles[k] = True if v == '"yes"' else False
elif v == 'true' or v == 'false':
node_roles[k] = True if v == 'true' else False
else :
node_roles[k] = v.replace('"', '')
return node_roles
@ -909,27 +909,50 @@ def get_db_schema() -> DBSchema:
debug(f'found existing {config.db_schema_table} table')
with_site_id = check_column_exists('site_id')
if with_site_id:
debug(f'found existing {config.db_schema_table}.site_id column')
with_site_id = True
if getattr(config, 'downgrade_release', False):
if not check_release_column_exists():
if not check_column_exists('release' ):
error('Cannot use downgrade release on the old database version')
shutdown(1)
cursor.execute(f"""
SELECT revision, node
FROM {config.db_schema_table}
WHERE `release` = %s
""", config.downgrade_release)
if with_site_id:
cursor.execute(f"""
SELECT revision, node, site_id
FROM {config.db_schema_table}
WHERE `release` = %s
""", config.downgrade_release)
else:
cursor.execute(f"""
SELECT revision, node, NULL as site_id
FROM {config.db_schema_table}
WHERE `release` = %s
""", config.downgrade_release)
else:
cursor.execute(f"""
SELECT revision, node
FROM {config.db_schema_table}
""")
if with_site_id:
cursor.execute(f"""
SELECT revision, node, site_id
FROM {config.db_schema_table}
""")
else:
cursor.execute(f"""
SELECT revision, node, NULL as site_id
FROM {config.db_schema_table}
""")
for row in cursor:
revision = row[0]
node = row[1]
site_id = row[2]
if not site_id:
site_id = 0
if revision not in db_schema:
db_schema[revision] = []
db_schema[revision].append(node)
db_schema[revision] = {}
if site_id not in db_schema[revision]:
db_schema[revision][site_id] = []
db_schema[revision][site_id].append(node)
cursor.close()
config.db_conn.commit()
@ -946,7 +969,7 @@ def get_db_schema() -> DBSchema:
return db_schema
def check_release_ column_exists() -> bool:
def check_column_exists(column_name: str ) -> bool:
"""Checks existence of ngcp.db_schema release colimn.
Args:
@ -961,9 +984,10 @@ def check_release_column_exists() -> bool:
cursor.execute("""
SELECT COUNT(column_name)
FROM information_schema.columns
WHERE concat(table_schema ,'.', table_name) = %s
AND column_name = 'release'
""", config.db_schema_table)
WHERE table_schema = %s
AND table_name = %s
AND column_name = %s
""", ('ngcp', 'db_schema', column_name))
column_exists = 0
row = cursor.fetchone()
@ -1101,7 +1125,7 @@ def apply_up_script(script_name: str, revision: int) -> int:
The script is skipped if:
- it is 'replicated' and already present in db_schema[revision]
- it is 'not_replicated' and already present in
db_schema[revision][node_name]
db_schema[revision][site_id][ node_name]
The script and further scripts are stopped if:
- config.to_revision > 0 and the current revision number
@ -1130,10 +1154,22 @@ def apply_up_script(script_name: str, revision: int) -> int:
db_schema = config.db_schema
not_replicated = 'not_replicated' in script_name
revision_in_db_schema = False
node_in_db_schema = False
site_id = int(config.node_roles.get('ngcp_site_id') or 1)
common_site_id = 0
node_roles = config.node_roles
is_db_role = node_roles.get('ngcp_is_db') is True
if revision in db_schema:
node_in_db_schema = node_name in db_schema[revision]
revision_in_db_schema = True
if site_id in db_schema[revision]:
if node_name in db_schema[revision][site_id]:
node_in_db_schema = True
if not node_in_db_schema and common_site_id in db_schema[revision]:
if node_name in db_schema[revision][common_site_id]:
node_in_db_schema = True
to_revision = getattr(config, 'to_revision', 0)
if to_revision > 0 and revision > config.to_revision:
@ -1141,7 +1177,7 @@ def apply_up_script(script_name: str, revision: int) -> int:
f'is greater than {config.to_revision}')
return 0
if revision in db_schema:
if revision_in_ db_schema:
if not not_replicated or node_in_db_schema:
debug(f'skip already applied revision {script_name}')
return 1
@ -1166,28 +1202,45 @@ def apply_up_script(script_name: str, revision: int) -> int:
config.db_conn.ping(reconnect=True)
if not config.batch_mode:
with config.db_conn.cursor() as cursor:
cursor.execute(f"""
INSERT INTO {config.db_schema_table}
(revision, node) VALUES (%s, %s)
""", (revision, node_name))
if is_db_role and check_column_exists('site_id'):
cursor.execute(f"""
INSERT INTO {config.db_schema_table}
(revision, node, site_id) VALUES (%s, %s, %s)
""", (revision, node_name, site_id))
else:
cursor.execute(f"""
INSERT INTO {config.db_schema_table}
(revision, node) VALUES (%s, %s)
""", (revision, node_name))
config.db_conn.commit()
config._script_was_applied = True
else:
sql = c_str(f"""
INSERT INTO {config.db_schema_table}
(revision, node)
VALUES
({revision}, '{node_name}')
""")
sql = ''
if is_db_role and check_column_exists('site_id'):
sql = c_str(f"""
INSERT INTO {config.db_schema_table}
(revision, node)
VALUES
({revision}, '{node_name}')
""")
else:
sql = c_str(f"""
INSERT INTO {config.db_schema_table}
(revision, node, site_id)
VALUES
({revision}, '{node_name}', {site_id})
""")
config._temp_sql_file.write(
str(f'\n; {sql}; COMMIT;\n').encode()
)
config._post_set_release_for_revisions.append(revision)
if revision in db_schema:
db_schema[revision].append(node_name)
else:
db_schema[revision] = [node_name]
if revision not in db_schema:
db_schema[revision] = {}
if site_id not in db_schema[revision]:
db_schema[revision][site_id] = []
db_schema[revision][site_id].append(node_name)
if not_replicated:
config._not_replicated_was_applied = True
@ -1237,10 +1290,20 @@ def apply_down_script(script_name: str, revision: int) -> int:
db_schema = config.db_schema
not_replicated = 'not_replicated' in script_name
revision_in_db_schema = False
node_in_db_schema = False
site_id = int(config.node_roles.get('ngcp_site_id') or 1)
common_site_id = 0
if revision in db_schema:
node_in_db_schema = node_name in db_schema[revision]
revision_in_db_schema = True
if site_id in db_schema[revision]:
if node_name in db_schema[revision][site_id]:
node_in_db_schema = True
if not node_in_db_schema and common_site_id in db_schema[revision]:
if node_name in db_schema[revision][common_site_id]:
node_in_db_schema = True
site_id = 0
to_revision = getattr(config, 'to_revision', 0)
if to_revision > 0 and revision <= config.to_revision:
@ -1252,7 +1315,7 @@ def apply_down_script(script_name: str, revision: int) -> int:
log(f'skip non-removable revision {script_name}')
return 1
if revision not in db_schema or \
if not revision_in_ db_schema or \
not_replicated and not node_in_db_schema:
debug(f'skip already removed revision {script_name}')
return 1
@ -1281,11 +1344,19 @@ def apply_down_script(script_name: str, revision: int) -> int:
with config.db_conn.cursor() as cursor:
rows = 0
if not_replicated:
rows = cursor.execute(f"""
DELETE FROM {config.db_schema_table}
WHERE revision = %s
AND node = %s
""", (revision, node_name))
if site_id and check_column_exists('site_id'):
rows = cursor.execute(f"""
DELETE FROM {config.db_schema_table}
WHERE revision = %s
AND node = %s
AND site_id = %s
""", (revision, node_name, site_id))
else:
rows = cursor.execute(f"""
DELETE FROM {config.db_schema_table}
WHERE revision = %s
AND node = %s
""", (revision, node_name))
else:
rows = cursor.execute(f"""
DELETE FROM {config.db_schema_table}
@ -1302,6 +1373,8 @@ def apply_down_script(script_name: str, revision: int) -> int:
""")
if not_replicated:
sql += f" AND node = '{node_name}'"
if site_id and check_column_exists('site_id'):
sql += f" AND site_id = '{site_id}'"
config._temp_sql_file.write(
str(f'\n; {sql}; COMMIT;\n').encode()
@ -1309,7 +1382,9 @@ def apply_down_script(script_name: str, revision: int) -> int:
if revision in db_schema:
if not_replicated:
db_schema[revision].remove(node_name)
db_schema[revision][site_id].remove(node_name)
if not db_schema[revision][site_id]:
del db_schema[revision][site_id]
else:
del db_schema[revision]
@ -1341,7 +1416,7 @@ def post_set_release_for_applied_revisions() -> None:
return
if not config._post_set_release_for_revisions:
return
if not check_release_ column_exists():
if not check_column_exists('release' ):
return
release = getattr(config, 'set_release', config.ngcp_version)
@ -1380,12 +1455,15 @@ def sync_ro_db() -> None:
None
"""
ngcp_type = config.node_roles['ngcp_type']
node_roles = config.node_roles
ngcp_type = node_roles['ngcp_type']
is_proxy_role = node_roles.get('ngcp_is_proxy') is True
if ngcp_type != 'carrier':
debug('skip sync R/O database on non "carrier" nodes')
return
if 'proxy' not in config.node_roles or config.node_roles['proxy'] is Fals e:
if not is_proxy_rol e:
debug(c_str("""
skip R/O database sync because this node
does not have the 'proxy' role.
@ -1603,7 +1681,7 @@ def show_release_info() -> None:
None
"""
if not check_release_ column_exists():
if not check_column_exists('release' ):
error('Cannot use get release info on the old database version')
shutdown(1)
@ -1656,6 +1734,10 @@ def show_start_summary() -> None:
log(f'{arg}: {v}')
log('-----------------------------------')
log('node_name: ' + config.node_name)
node_roles = config.node_roles
if node_roles.get('ngcp_is_multi_site'):
site_id = str(node_roles.get('ngcp_site_id') or 1)
log('site_id: ' + site_id)
log('node_state: ' + config.node_state)
log('ngcp_version: ' + config.ngcp_version)
log('-----------------------------------')
@ -1704,7 +1786,7 @@ def main() -> None:
sync_ro_db()
log('done in %.2fs.' % (time.time()-started_at))
shutdown(0)
except Exception as err: # noqa: B902
except RuntimeError as err: # noqa: B902
error(str(err))
shutdown(1)