mirror of https://github.com/sipwise/db-schema.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1796 lines
55 KiB
1796 lines
55 KiB
#!/bin/env python3
|
|
"""This script takes care of NGCP database versioning.
|
|
|
|
It can initialise, upgrade and downgrade an NGCP database
|
|
using .up and .down scripts
|
|
|
|
For more info you can run check the docstring descriptions or
|
|
run the script with --help
|
|
"""
|
|
import argparse
|
|
import configparser
|
|
import json
|
|
import os
|
|
import re
|
|
import signal
|
|
import sys
|
|
import tempfile
|
|
import textwrap
|
|
import time
|
|
from io import BufferedRandom
|
|
from pathlib import Path
|
|
from subprocess import PIPE, Popen
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import pymysql.cursors
|
|
from prettytable import PrettyTable
|
|
from pymysql import Connection
|
|
|
|
Revisions = Dict[str, Dict[int, str]]
|
|
DBSchema = Dict[int, Dict[int, List[str]]]
|
|
ConfigDict = Dict[str, str | int | bool]
|
|
|
|
|
|
class RevisionApplyError(Exception):
|
|
"""Exception for revision apply error."""
|
|
pass
|
|
|
|
|
|
class Config:
|
|
"""Config class.
|
|
|
|
Contains all the script collected configuration, states, etc.
|
|
|
|
It's a global variable and available throughout the script but
|
|
it is instantiated in main()
|
|
|
|
Attributes:
|
|
db_socket_user (str): database user when connected via socket
|
|
db_socket (str): database socket
|
|
db_ro_socket (str): database socket of R/O instance
|
|
db_defaults_conf_file (str): database defaults conf file to use when
|
|
socket is not available
|
|
db_connect_method (str): 'socket' or 'config' (default: socket)
|
|
|
|
db_schema_table (str): database schema table name
|
|
node_name_file (str): node name file
|
|
roles_file (str): node roles file
|
|
db_file (str): node db config file
|
|
ngcp_sync_db_script (str): ngcp sync db script
|
|
|
|
ngcp_version_file: (str): file containing ngcp version string
|
|
|
|
automated (bool): wether the script is run in the automated mode
|
|
that automatically init the db schema if it's empty.
|
|
this option is taken from either
|
|
AUTOMATED_INSTALL_MODE os.environ
|
|
(backward compatibility) or from the args
|
|
skip_ro_db_sync (bool): skip db synchronisation (if applicable)
|
|
this option is taken from either SKIP_SYNC_DB
|
|
os.environ (backward compatibility) or from the args
|
|
debug (bool): debug/verbose mode
|
|
mode (str): 'up' mode or 'down' mode
|
|
to_revision (int): processes revisions depending on the mode:
|
|
- mode=up: applies all revisions including to_revision
|
|
- mode=down: removes all revisions excluding to_revision
|
|
set_release (str): specify release version that is set
|
|
to all applied .up scripts instead of fetching it from
|
|
/etc/ngcp_version. Only works with mode=up
|
|
downgrade_release (str): downgrades the specified release
|
|
mutually exclusive with 'mode'
|
|
force (bool): run this script on the active node
|
|
batch_mode (bool): instead of applying scripts one by one, collects
|
|
them all and builds a single sql file that is applied in one go
|
|
db_scripts_dir (str): directory containing .up/.down scripts
|
|
get_release_info (bool): instead of applying revisionins, it returns
|
|
a list of applied releases
|
|
as_json (bool): return result from get-release-info as json rather
|
|
than a table (and potentially other return data in the future)
|
|
ngcp_upgrade (bool): defines if the script is run under 'ngcp-upgrade'
|
|
this option automatically uses NGCP_UPGRADE os.environ,
|
|
unless provided
|
|
supported_args (List[str]): a list of the attributes described above
|
|
that can be also passed args, to automate the code
|
|
|
|
_args (Any): parsed argspare result (usually Namespace())
|
|
_subproc (Popen): Popen handler to an opened subprocess, if any
|
|
_prompt_is_shown (bool): if the prompt is shown,
|
|
a helper to handle SIGINT
|
|
_interrupted (bool): if the script was interrupted with SIGINT
|
|
_not_replicated_was_applied (bool): if at least one not_replicated
|
|
script was applied/removed
|
|
_temp_sql_file (BufferedRandom): used to accumulate the sql scripts
|
|
data when batch mode is enabled
|
|
_run_cmd_mysql_options (List[str]) = extra options that are passed
|
|
to every invokation of run_cmd() that calls the 'mysql' console
|
|
command. needed to use same connection (socket/config/user, etc.)
|
|
as the pymysql db connection
|
|
_scripts_dir_order (List[str]): order to apply scripts
|
|
from the subdirectories
|
|
_post_set_release_for_revisions (List[int]): set release in the
|
|
db_schema table for all revisions that were applied when
|
|
the `release` column was not available
|
|
|
|
|
|
node_name (str): current node name
|
|
node_roles (ConfigDict): current node roles
|
|
node_dbconf (ConfigDict): current node db config
|
|
node_state (str): current node state (active/inactive,etc.)
|
|
ngcp_version (str): current NGCP version
|
|
db_conn (Connection): database connection handler
|
|
db_schema (DBSchema): database schema revisions state
|
|
revisions (Revisions): revisions that are read from
|
|
the scripts directory
|
|
|
|
sync_db_databases (str): a string containing a space separated list
|
|
of databases that ngcp-sync-db will synchronise
|
|
sync_db_ignore_tables (str): a string containing a space separated list
|
|
of tables that ngcp-sync-db should ignore
|
|
|
|
"""
|
|
|
|
# db options
|
|
db_socket_user: str = 'root'
|
|
db_socket: str = '/run/mysqld/mysqld.sock'
|
|
db_ro_socket: str = '/run/mysqld/mysqld2.sock'
|
|
db_defaults_conf_file: str = '/etc/mysql/sipwise_extra.cnf'
|
|
db_connect_method: str = 'socket'
|
|
|
|
# required config files location options
|
|
db_schema_table: str = 'ngcp.db_schema'
|
|
node_name_file: str = '/etc/ngcp_nodename'
|
|
roles_file: str = '/etc/default/ngcp-roles'
|
|
db_file: str = '/etc/default/ngcp-db'
|
|
ngcp_sync_db_script: str = '/usr/sbin/ngcp-sync-db'
|
|
ngcp_version_file: str = '/etc/ngcp_version'
|
|
|
|
# default config options
|
|
automated: bool = False
|
|
skip_ro_db_sync: bool
|
|
force_ro_db_sync: bool
|
|
debug: bool = False
|
|
mode: str = 'up'
|
|
to_revision: int
|
|
set_release: str
|
|
downgrade_release: str
|
|
force: bool = False
|
|
batch_mode: bool = False
|
|
db_scripts_dir: str = '/usr/share/ngcp-db-schema/db_scripts'
|
|
get_release_info: bool = False
|
|
as_json: bool = False
|
|
ngcp_upgrade: bool = False
|
|
supported_args: List[str] = ['automated', 'batch_mode',
|
|
'db_socket', 'db_connect_method',
|
|
'db_defaults_conf_file',
|
|
'db_scripts_dir',
|
|
'debug', 'force',
|
|
'mode',
|
|
'skip_ro_db_sync',
|
|
'force_ro_db_sync',
|
|
'to_revision',
|
|
'downgrade_release',
|
|
'set_release',
|
|
'get_release_info',
|
|
'as_json',
|
|
'ngcp_upgrade']
|
|
|
|
# internal attributes
|
|
_args: Any = None
|
|
_subproc: Optional[Popen[Any]] = None
|
|
_prompt_is_shown: bool = False
|
|
_interrupted: bool = False
|
|
_script_was_applied: bool = False
|
|
_not_replicated_was_applied: bool = False
|
|
_temp_sql_file: BufferedRandom = None # type: ignore
|
|
_run_cmd_mysql_options: List[str] = []
|
|
_scripts_dir_order: List[str] = ['init', 'base', 'diff']
|
|
_post_set_release_for_revisions: List[int] = []
|
|
|
|
# attributes that are initialised in __init__()
|
|
node_name: str
|
|
node_roles: ConfigDict
|
|
node_dbconf: ConfigDict
|
|
node_state: str
|
|
ngcp_version: str
|
|
db_conn: Connection # type: ignore
|
|
db_schema: DBSchema
|
|
revisions: Revisions
|
|
|
|
# ngcp sync db related options
|
|
sync_db_databases = 'ngcp billing carrier kamailio freeswitch ' + \
|
|
'provisioning prosody'
|
|
sync_db_ignore_tables = 'kamailio.voicemail_spool ' + \
|
|
'provisioning.autoprov_firmwares_data ' + \
|
|
'provisioning.voip_fax_data ' + \
|
|
'billing.journals'
|
|
|
|
def __init__(self) -> None:
|
|
"""Config class constructor.
|
|
|
|
Takes over SIGINT handling
|
|
|
|
"""
|
|
|
|
def handle_signal(signum: int, _: Any) -> None:
|
|
if self._prompt_is_shown:
|
|
log('aborted.')
|
|
shutdown(0)
|
|
if self._subproc:
|
|
log('waiting for the subprocess to finish...')
|
|
self._subproc.wait()
|
|
self._interrupted = True
|
|
|
|
signal.signal(signal.SIGINT, handle_signal)
|
|
|
|
def setup_config(self) -> None:
|
|
"""Prepares info that is used by the script."""
|
|
self.node_name = get_node_name()
|
|
self.node_state = get_node_state()
|
|
self.node_roles = get_node_roles()
|
|
self.node_dbconf = get_node_dbconf()
|
|
self.ngcp_version = get_ngcp_version()
|
|
|
|
def setup_db(self) -> None:
|
|
"""Prepares database related setup."""
|
|
self.db_conn = connect_db()
|
|
self.db_schema = get_db_schema()
|
|
|
|
|
|
class MyRawTextHelpFormatter(argparse.RawDescriptionHelpFormatter):
|
|
"""Class that provides with better argparse help formatting."""
|
|
|
|
def __add_whitespace(self, idx: int, w_space: int, text: str) -> Any:
|
|
"""Adds whitespace formatting."""
|
|
if idx == 0:
|
|
return text
|
|
return (' ' * w_space) + text
|
|
|
|
def _split_lines(self, text: Any, width: int) -> Any:
|
|
"""Handles wrapping and bullet points formatting."""
|
|
text = text.splitlines()
|
|
for idx, line in enumerate(text):
|
|
if not re.match(r'^\s*$', line):
|
|
line = line.strip() + '\n'
|
|
search = re.search(r'\s*[0-9\-]{0,}\.?\s*', line)
|
|
if line.strip() == '':
|
|
text[idx] = ''
|
|
elif search:
|
|
l_space = search.end()
|
|
lines = [self.__add_whitespace(i, l_space, x)
|
|
for i, x in enumerate(textwrap.wrap(line, width))]
|
|
text[idx] = lines
|
|
|
|
return [item for sublist in text for item in sublist]
|
|
|
|
|
|
"""Instantiated in main()."""
|
|
config: Config
|
|
|
|
|
|
def _logit(l_str: str, indent: int, end: str, s_char: str = '->') -> None:
|
|
"""Formats and prints the log string.
|
|
|
|
Used by public functions such as log(), debug(), error()
|
|
|
|
Args:
|
|
l_str (str): log string
|
|
indent (int): indentation level
|
|
end (str): line ending string
|
|
s_char: line start string
|
|
|
|
Returns:
|
|
None
|
|
|
|
"""
|
|
print('%s%s %s' % (' ' * 4 * indent, s_char, l_str), end=end)
|
|
|
|
|
|
def log(l_str: str, indent: int = 0, end: str = '\n') -> None:
|
|
"""Prints the log string.
|
|
|
|
Public function used in the code to always print a log string
|
|
|
|
Args:
|
|
l_str (str): log string
|
|
indent (int): indentation level
|
|
end (str): line ending string
|
|
|
|
Returns:
|
|
None
|
|
|
|
"""
|
|
_logit(l_str, indent, end)
|
|
|
|
|
|
def debug(l_str: str, indent: int = 0, end: str = '\n') -> None:
|
|
"""Print the debug string.
|
|
|
|
Public function used in the code to print debug info,
|
|
only if debug is enabled
|
|
|
|
Args:
|
|
l_str (str): log string
|
|
indent (int): indentation level
|
|
end (str): line ending string
|
|
|
|
Returns:
|
|
None
|
|
|
|
"""
|
|
if config.debug:
|
|
_logit(l_str, indent, end, '=>')
|
|
|
|
|
|
def error(l_str: str, indent: int = 0, end: str = '\n') -> None:
|
|
"""Print the error string.
|
|
|
|
Public function used in the code to print error
|
|
|
|
Args:
|
|
l_str (str): log string
|
|
indent (int): indentation level
|
|
end (str): line ending string
|
|
|
|
Returns:
|
|
None
|
|
|
|
"""
|
|
_logit(f'Error: {l_str}', indent, end, '!>')
|
|
|
|
|
|
def str_to_bool(in_str: str) -> bool:
|
|
"""Converts a string into its boolean representation.
|
|
|
|
'true', 'yes', '1' - boolean strings
|
|
|
|
Args:
|
|
in_str (str): input string
|
|
|
|
Returns:
|
|
bool: True if the string has boolean representation,
|
|
otherwise False
|
|
|
|
"""
|
|
return in_str.lower() in ('true', 'yes', '1')
|
|
|
|
|
|
def c_str(in_str: str) -> str:
|
|
"""Makes a string compact by trimming excess whitespace.
|
|
|
|
Args:
|
|
in_str (str): input string
|
|
|
|
Returns:
|
|
str: compact string
|
|
|
|
"""
|
|
out_str = in_str.strip()
|
|
return re.sub(r'\s{2,}|\n', ' ', out_str, flags=re.MULTILINE)
|
|
|
|
|
|
def connect_db() -> Connection: # type: ignore
|
|
"""Connects to the database to perform versioning on.
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns:
|
|
Connection: database connection handler
|
|
|
|
"""
|
|
db_host = config.node_dbconf['pair_dbhost']
|
|
db_port = int(config.node_dbconf['pair_dbport'])
|
|
db_user = config.db_socket_user
|
|
db_socket = config.db_socket
|
|
db_defaults_conf_file = config.db_defaults_conf_file
|
|
db_connect_method = config.db_connect_method
|
|
|
|
socket_available = os.access(str(db_socket), os.R_OK)
|
|
|
|
if db_connect_method == 'socket' and not socket_available:
|
|
debug(c_str(f"""
|
|
socket file {db_socket} is not readable,
|
|
using {db_defaults_conf_file}
|
|
"""))
|
|
db_connect_method == 'config'
|
|
|
|
if db_connect_method == 'socket':
|
|
config._run_cmd_mysql_options = [
|
|
'-h', str(db_host), '-P', str(db_port),
|
|
'-u', db_user, '-S', db_socket
|
|
]
|
|
try:
|
|
db_conn: Connection = pymysql.connect( # type: ignore
|
|
host=db_host,
|
|
port=db_port,
|
|
user=db_user,
|
|
unix_socket=db_socket,
|
|
autocommit=0,
|
|
)
|
|
debug(
|
|
'connected to database %s:%d via socket %s as %s' %
|
|
(db_host, db_port, db_socket, db_user)
|
|
)
|
|
except pymysql.Error as e:
|
|
error(
|
|
'could not connect to database %s:%d via socket %s as %s: %s' %
|
|
(db_host, db_port, db_socket, db_user, e)
|
|
)
|
|
shutdown(1)
|
|
elif db_connect_method == 'config':
|
|
config._run_cmd_mysql_options = [
|
|
f'--defaults-extra-file={db_defaults_conf_file}',
|
|
'-h', str(db_host), '-P', str(db_port),
|
|
]
|
|
try:
|
|
with open(db_defaults_conf_file) as file:
|
|
for line in file.readlines():
|
|
m = re.match(r'^user\s+=\s+(\S+)', line)
|
|
if m:
|
|
db_user = m.group(1)
|
|
break
|
|
except FileNotFoundError as e:
|
|
error(f'cannot access {db_defaults_conf_file}: {e}')
|
|
shutdown(1)
|
|
|
|
try:
|
|
db_conn: Connection = pymysql.connect( # type: ignore
|
|
host=db_host,
|
|
port=db_port,
|
|
user=db_user,
|
|
read_default_file=db_defaults_conf_file,
|
|
autocommit=0,
|
|
)
|
|
log(
|
|
'connected to database %s:%d via config %s as %s' %
|
|
(db_host, db_port, db_defaults_conf_file, db_user)
|
|
)
|
|
except pymysql.Error as e:
|
|
error(
|
|
'could not connect to database %s:%d via config %s as %s: %s' %
|
|
(db_host, db_port, db_defaults_conf_file, db_user, e)
|
|
)
|
|
shutdown(1)
|
|
else:
|
|
error('unknown database connection scenario')
|
|
shutdown(1)
|
|
|
|
db_conn.query('SET SESSION wait_timeout = 600;')
|
|
|
|
return db_conn
|
|
|
|
|
|
def parse_args() -> None:
|
|
"""Parses command line arguments.
|
|
|
|
Stores them in config.args upon success as well as
|
|
Update the corresponding config attributes provided in
|
|
config.supported_args
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns:
|
|
None
|
|
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
description=c_str("""
|
|
NGCP database schema versioning tool that upgrades
|
|
and downgrades the database schema state.
|
|
"""),
|
|
formatter_class=MyRawTextHelpFormatter,
|
|
allow_abbrev=False,
|
|
argument_default=argparse.SUPPRESS,
|
|
)
|
|
|
|
ro_db_sync_group = parser.add_mutually_exclusive_group()
|
|
downgrade_group = parser.add_mutually_exclusive_group()
|
|
|
|
# Define arguments
|
|
parser.add_argument(
|
|
'--verbose', '-v',
|
|
dest='debug',
|
|
action='store_true',
|
|
help="""
|
|
enable verbose output (debug mode).
|
|
""")
|
|
|
|
parser.add_argument(
|
|
'--force', '-f',
|
|
dest='force',
|
|
action='store_true',
|
|
help="""
|
|
force the database schema to be applied even if the node is inactive.
|
|
""")
|
|
|
|
downgrade_group.add_argument(
|
|
'--mode', '-m',
|
|
dest='mode',
|
|
type=str,
|
|
help="""
|
|
schema upgrade mode:
|
|
|
|
- 'up' (default): upgrades the database
|
|
- 'down': downgrades the database.
|
|
|
|
mutually exclusive with --downgrade-release
|
|
""")
|
|
|
|
parser.add_argument(
|
|
'--to-revision', '-t',
|
|
dest='to_revision',
|
|
type=int,
|
|
help="""
|
|
max revision to upgrade/downgrade the schema depending on the mode:
|
|
|
|
- mode=up: applies all revisions including --to-revision
|
|
- mode=down: removes all revisions excluding --to-revision
|
|
""")
|
|
|
|
parser.add_argument(
|
|
'--set-release',
|
|
dest='set_release',
|
|
type=str,
|
|
help="""
|
|
set release to a specific version instead of fetching it from
|
|
/etc/ngcp_version.
|
|
|
|
only works with mode=up
|
|
""")
|
|
|
|
downgrade_group.add_argument(
|
|
'--downgrade-release',
|
|
dest='downgrade_release',
|
|
type=str,
|
|
help="""
|
|
downgrade all applied revisions of the specified release
|
|
|
|
mutually exclusive with --mode
|
|
""")
|
|
|
|
parser.add_argument(
|
|
'--automated', '-a',
|
|
dest='automated',
|
|
action='store_true',
|
|
help="""
|
|
automated mode, does not prompt about an empty db_schema and
|
|
automatically initialises it if empty '(including the revision).
|
|
""")
|
|
|
|
parser.add_argument(
|
|
'--batch-mode', '-b',
|
|
dest='batch_mode',
|
|
action='store_true',
|
|
help="""
|
|
(Experimental batch mode)
|
|
Instead of applying scripts and sql statements one by one,
|
|
they are accumulated and applied all at once.
|
|
""")
|
|
|
|
ro_db_sync_group.add_argument(
|
|
'--skip-ro-db-sync',
|
|
dest='skip_ro_db_sync',
|
|
action='store_true',
|
|
help="""
|
|
(Carrier only): skip automatic ngcp-sync-db invokation on proxy
|
|
nodes if at least one not_replicated scripts was applied.
|
|
""")
|
|
|
|
ro_db_sync_group.add_argument(
|
|
'--force-ro-db-sync',
|
|
dest='force_ro_db_sync',
|
|
action='store_true',
|
|
help="""
|
|
(Carrier only): force automatic ngcp-sync-db invokation on proxy
|
|
nodes even if no not_replicated scripts were applied.
|
|
""")
|
|
|
|
parser.add_argument(
|
|
'--db-socket',
|
|
dest='db_socket',
|
|
type=str,
|
|
help=f"""
|
|
Database socket file to use instead of the default one
|
|
({config.db_socket})
|
|
""")
|
|
|
|
parser.add_argument(
|
|
'--db-defaults-conf-file',
|
|
dest='db_defaults_conf_file',
|
|
type=str,
|
|
help=f"""
|
|
Database defaults config file to use insted of the default one
|
|
({config.db_defaults_conf_file})
|
|
""")
|
|
|
|
parser.add_argument(
|
|
'--db-connect-method',
|
|
dest='db_connect_method',
|
|
choices=['socket', 'config'],
|
|
type=str,
|
|
help=f"""
|
|
Force database connect method instead of the default one
|
|
({config.db_connect_method})
|
|
""")
|
|
|
|
parser.add_argument(
|
|
'--db-scripts-dir',
|
|
dest='db_scripts_dir',
|
|
type=str,
|
|
help=f"""
|
|
Override database .up/.down scripts directory instead of the
|
|
default one ({config.db_scripts_dir})
|
|
""")
|
|
|
|
parser.add_argument(
|
|
'--get-release-info',
|
|
dest='get_release_info',
|
|
action='store_true',
|
|
help="""
|
|
Fetch a list of releases and min/max revision per release.
|
|
""")
|
|
|
|
parser.add_argument(
|
|
'--as-json',
|
|
dest='as_json',
|
|
action='store_true',
|
|
help="""
|
|
Return data is represented as JSON
|
|
|
|
(only works for --get-release-info)
|
|
""")
|
|
|
|
parser.add_argument(
|
|
'--ngcp-upgrade', '-u',
|
|
dest='ngcp_upgrade',
|
|
action='store_true',
|
|
help="""
|
|
ngcp upgrade mode, affects certain aspects of applying up scripts
|
|
and sync R/O db
|
|
""")
|
|
|
|
args = parser.parse_args()
|
|
|
|
for arg in config.supported_args:
|
|
if hasattr(args, arg):
|
|
config.__dict__[arg] = getattr(args, arg)
|
|
elif (arg == 'automated'):
|
|
config.automated = str_to_bool(
|
|
os.getenv('AUTOMATED_INSTALL_MODE', '')
|
|
)
|
|
elif (arg == 'skip_ro_db_sync'):
|
|
config.skip_ro_db_sync = str_to_bool(
|
|
os.getenv('SKIP_SYNC_DB', '')
|
|
)
|
|
elif (arg == 'db_socket'):
|
|
config.db_socket = os.getenv('MYSQL_SOCKET', config.db_socket)
|
|
elif (arg == 'ngcp_upgrade'):
|
|
config.ngcp_upgrade = str_to_bool(
|
|
os.getenv('NGCP_UPGRADE', '')
|
|
)
|
|
|
|
if hasattr(args, 'downgrade_release'):
|
|
config.mode = 'down'
|
|
|
|
if hasattr(config, 'set_release') and config.mode == 'down':
|
|
parser.print_help(sys.stderr)
|
|
print(
|
|
sys.argv[0] +
|
|
' ' +
|
|
'error: argument --set-release: requires --mode=up',
|
|
file=sys.stderr
|
|
)
|
|
sys.exit(1)
|
|
|
|
config._args = args
|
|
|
|
|
|
def get_node_name() -> str:
|
|
"""Fetches node name.
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns:
|
|
str: node name
|
|
|
|
"""
|
|
with open(config.node_name_file, 'r', encoding='utf8') as file:
|
|
return file.readline().strip()
|
|
|
|
|
|
def run_cmd(*cmd: Any, **kwargs: Any) -> tuple[bytes, bytes]:
|
|
"""Runs system command.
|
|
|
|
Args:
|
|
cmd (tuple[str]): command and arguments
|
|
|
|
Returns:
|
|
(bytes, bytes): stdout, stderr
|
|
|
|
"""
|
|
stdin = None
|
|
if 'stdin' in kwargs:
|
|
stdin = kwargs['stdin']
|
|
config._subproc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
|
|
out, err = config._subproc.communicate(input=stdin)
|
|
config._subproc = None
|
|
return out, err
|
|
|
|
|
|
def read_revision_file(filename: str) -> bytes:
|
|
"""Reads revision file.
|
|
|
|
Currently not used due to issues with the existing scripts
|
|
not being correctly SQL syntax compatible and only "mysql console"
|
|
compatible
|
|
|
|
Args:
|
|
filename (str): filename to read
|
|
|
|
Returns:
|
|
bytes: sql data
|
|
|
|
"""
|
|
raise NotImplementedError(c_str(
|
|
f'{sys._getframe().f_code.co_name}() must not be used'
|
|
))
|
|
|
|
sql: bytes = bytes()
|
|
default_delimiter = ';'
|
|
delimiter_set = ''
|
|
rx_delimiter_declare = re.compile(r'^\s*delimiter\s+(\S+)', re.IGNORECASE)
|
|
rx_delimiter_char: re.Pattern[str] = re.Pattern()
|
|
rx_delimiter_end: re.Pattern[str] = re.Pattern()
|
|
rx_use_stmt_no_end = re.compile(r'^use\s*\S+[^;]\s*$', re.IGNORECASE)
|
|
with open(filename, mode='rb') as file:
|
|
for b_line in file.readlines():
|
|
line = new_line = b_line.decode()
|
|
modified = False
|
|
m_delim = rx_delimiter_declare.search(line)
|
|
if m_delim:
|
|
delim = m_delim.group(1)
|
|
if delimiter_set:
|
|
if delim == default_delimiter:
|
|
delimiter_set = ''
|
|
elif delimiter_set != delim and \
|
|
delimiter_set != default_delimiter:
|
|
error(c_str(f"""
|
|
Found an unexpected delimiter combo,
|
|
active delimiter={delimiter_set},
|
|
new delimiter={delim}
|
|
"""))
|
|
shutdown(1)
|
|
else:
|
|
delimiter_set = delim
|
|
else:
|
|
delimiter_set = delim
|
|
rx_delimiter_char = re.compile(rf'\s*\{delimiter_set}\s*$')
|
|
rx_delimiter_end = re.compile(rf'\w+\s*\{delimiter_set}\s*$')
|
|
new_line = rx_delimiter_declare.sub('', line)
|
|
if new_line != line:
|
|
modified = True
|
|
elif delimiter_set:
|
|
m_char = rx_delimiter_char.search(line)
|
|
if m_char:
|
|
m_end = rx_delimiter_end.search(line)
|
|
new_line = line.replace(
|
|
f'{delimiter_set}',
|
|
m_end and f'{default_delimiter}' or '',
|
|
1
|
|
)
|
|
if line != new_line:
|
|
modified = True
|
|
m_use_no_end = rx_use_stmt_no_end.search(line.strip())
|
|
if m_use_no_end:
|
|
new_line = f'{line.strip()}{default_delimiter}\n'
|
|
if line != new_line:
|
|
modified = True
|
|
sql += new_line.encode() if modified else b_line
|
|
return sql
|
|
|
|
|
|
def get_ngcp_version() -> str:
|
|
"""Fetches NGCP version.
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
str: the version string
|
|
|
|
"""
|
|
with open(config.ngcp_version_file, 'r', encoding='utf8') as file:
|
|
return file.readline().strip()
|
|
|
|
|
|
def get_node_state() -> str:
|
|
"""Fetches node state.
|
|
|
|
state: ['active','standby','unknown','transition']
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
str: the state string
|
|
|
|
"""
|
|
stdout, stderr = run_cmd(
|
|
'/usr/sbin/ngcp-check-active', '-v', 'root'
|
|
)
|
|
node_state = 'unknown'
|
|
if stderr:
|
|
node_state = 'unknown'
|
|
else:
|
|
node_state = stdout.decode().strip()
|
|
return node_state
|
|
|
|
|
|
def get_node_roles() -> ConfigDict:
|
|
"""Fetches node roles.
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
ConfigDict: config Dict containing node roles
|
|
as well as its type
|
|
|
|
"""
|
|
roles_config = configparser.ConfigParser()
|
|
default_section = roles_config.default_section
|
|
node_roles: ConfigDict = {}
|
|
with open(config.roles_file) as file:
|
|
roles_config.read_string(
|
|
f'[{default_section}]\n{file.read()}'
|
|
)
|
|
for k, v in roles_config[default_section].items():
|
|
if v == '"yes"' or v == '"no"':
|
|
node_roles[k] = True if v == '"yes"' else False
|
|
elif v == 'true' or v == 'false':
|
|
node_roles[k] = True if v == 'true' else False
|
|
else:
|
|
node_roles[k] = v.replace('"', '')
|
|
|
|
return node_roles
|
|
|
|
|
|
def get_node_dbconf() -> ConfigDict:
|
|
"""Fetches node db config.
|
|
|
|
Db config represents hosts, ports and other info
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
ConfigDict: config Dict containing node db config
|
|
|
|
"""
|
|
db_config = configparser.ConfigParser()
|
|
default_section = db_config.default_section
|
|
node_dbconf: ConfigDict = {}
|
|
with open(config.db_file) as file:
|
|
db_config.read_string(
|
|
f'[{default_section}]\n{file.read()}'
|
|
)
|
|
for k, v in db_config[default_section].items():
|
|
node_dbconf[k.lower()] = v
|
|
return node_dbconf
|
|
|
|
|
|
def get_db_schema() -> DBSchema:
|
|
"""Fetches db schema.
|
|
|
|
Db schema contains already applied revisions
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
DbSchema: revisions applied to nodes, the return
|
|
value is an empty Dict if the db schema
|
|
is not yet initialised
|
|
|
|
"""
|
|
db_schema: DBSchema = {}
|
|
cursor: pymysql.cursors.Cursor = config.db_conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT table_schema, table_name
|
|
FROM information_schema.tables
|
|
WHERE concat(table_schema ,'.', table_name) = %s
|
|
""", config.db_schema_table)
|
|
if cursor.rowcount == 0:
|
|
debug(f'{config.db_schema_table} table does not exist')
|
|
return db_schema
|
|
|
|
debug(f'found existing {config.db_schema_table} table')
|
|
|
|
with_site_id = check_column_exists('site_id')
|
|
if with_site_id:
|
|
debug(f'found existing {config.db_schema_table}.site_id column')
|
|
with_site_id = True
|
|
|
|
if getattr(config, 'downgrade_release', False):
|
|
if not check_column_exists('release'):
|
|
error('Cannot use downgrade release on the old database version')
|
|
shutdown(1)
|
|
if with_site_id:
|
|
cursor.execute(f"""
|
|
SELECT revision, node, site_id
|
|
FROM {config.db_schema_table}
|
|
WHERE `release` = %s
|
|
""", config.downgrade_release)
|
|
else:
|
|
cursor.execute(f"""
|
|
SELECT revision, node, NULL as site_id
|
|
FROM {config.db_schema_table}
|
|
WHERE `release` = %s
|
|
""", config.downgrade_release)
|
|
else:
|
|
if with_site_id:
|
|
cursor.execute(f"""
|
|
SELECT revision, node, site_id
|
|
FROM {config.db_schema_table}
|
|
""")
|
|
else:
|
|
cursor.execute(f"""
|
|
SELECT revision, node, NULL as site_id
|
|
FROM {config.db_schema_table}
|
|
""")
|
|
|
|
for row in cursor:
|
|
revision = row[0]
|
|
node = row[1]
|
|
site_id = row[2]
|
|
if not site_id:
|
|
site_id = 0
|
|
if revision not in db_schema:
|
|
db_schema[revision] = {}
|
|
if site_id not in db_schema[revision]:
|
|
db_schema[revision][site_id] = []
|
|
db_schema[revision][site_id].append(node)
|
|
cursor.close()
|
|
config.db_conn.commit()
|
|
|
|
if config.mode == 'down' and not db_schema:
|
|
if config.downgrade_release:
|
|
error(c_str(f"""
|
|
could not find entries in db_schema \
|
|
table matching release '{config.downgrade_release}'
|
|
"""))
|
|
else:
|
|
error('could not find entries in db_schema')
|
|
shutdown(1)
|
|
|
|
return db_schema
|
|
|
|
|
|
def check_column_exists(column_name: str) -> bool:
|
|
"""Checks existence of ngcp.db_schema release colimn.
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
bool: True if exists, False otherwise
|
|
|
|
"""
|
|
cursor: pymysql.cursors.Cursor = config.db_conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT COUNT(column_name)
|
|
FROM information_schema.columns
|
|
WHERE table_schema = %s
|
|
AND table_name = %s
|
|
AND column_name = %s
|
|
""", ('ngcp', 'db_schema', column_name))
|
|
|
|
column_exists = 0
|
|
row = cursor.fetchone()
|
|
if row:
|
|
column_exists = row[0]
|
|
|
|
cursor.close()
|
|
config.db_conn.commit()
|
|
|
|
return bool(column_exists)
|
|
|
|
|
|
def get_revisions() -> Revisions:
|
|
"""Fetches revisions.
|
|
|
|
The revisions are read from the scripts directory
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
Revision: a Dict structure with revisions
|
|
and script names
|
|
|
|
|
|
"""
|
|
debug('fetching db scripts...')
|
|
mode = config.mode
|
|
revisions: Revisions = {}
|
|
for spec in config._scripts_dir_order:
|
|
rev_dir = f'{config.db_scripts_dir}/{spec}'
|
|
if not os.access(rev_dir, os.R_OK):
|
|
error(f'non-existing or empty scripts dir {rev_dir}')
|
|
shutdown(1)
|
|
revisions[spec] = {}
|
|
for path in Path(f'{config.db_scripts_dir}/{spec}').glob(f'*.{mode}'):
|
|
revision = int(path.stem.split('_')[0])
|
|
revisions[spec][revision] = path.name
|
|
return revisions
|
|
|
|
|
|
def apply_revisions(revisions: Revisions) -> None:
|
|
"""Applies revisions.
|
|
|
|
Processes through fetched revisions and depending
|
|
on the mode calls either apply_up_scipt() or apply_down_script()
|
|
|
|
If the batch mode is enabled it also invokes run_cmd() with
|
|
the accumulated config_temp_sql_file
|
|
|
|
Args:
|
|
revisions (Revisions): revisions structure
|
|
|
|
Returns
|
|
None
|
|
|
|
"""
|
|
mode = config.mode
|
|
dir_specs = config._scripts_dir_order
|
|
max_schema_revision = 0
|
|
|
|
if config.db_schema:
|
|
del dir_specs[0]
|
|
max_schema_revision = max(config.db_schema)
|
|
|
|
if mode == 'down':
|
|
dir_specs.reverse()
|
|
|
|
if config.batch_mode:
|
|
config._temp_sql_file = tempfile.NamedTemporaryFile() # type: ignore
|
|
config._temp_sql_file.write(
|
|
str(
|
|
'SET SESSION wait_timeout = 600;\n' +
|
|
'SET SESSION net_read_timeout = 600;\n' +
|
|
'SET SESSION net_write_timeout = 600;\n'
|
|
).encode()
|
|
)
|
|
|
|
try:
|
|
for spec in dir_specs:
|
|
sorted_revisions = sorted(revisions[spec].keys())
|
|
if mode == 'down':
|
|
sorted_revisions.reverse()
|
|
for revision in sorted_revisions:
|
|
script = revisions[spec][revision]
|
|
script_name = f'{spec}/{script}'
|
|
ret = 0
|
|
if mode == 'up':
|
|
ret = apply_up_script(script_name, revision)
|
|
else:
|
|
# silently skip all revisionns that are greater
|
|
# than max revision in the db_schema as there is
|
|
# nothing to remove
|
|
if max_schema_revision and revision > max_schema_revision:
|
|
continue
|
|
ret = apply_down_script(script_name, revision)
|
|
if not ret:
|
|
break
|
|
if config._interrupted:
|
|
log('stopped.')
|
|
return
|
|
else:
|
|
continue
|
|
break
|
|
|
|
if config.batch_mode and config._temp_sql_file.tell():
|
|
debug(f'using accumulated sql {config._temp_sql_file.name}')
|
|
log('applying schema changes')
|
|
config._temp_sql_file.flush()
|
|
_, stderr = run_cmd(
|
|
'/usr/bin/mysql', *config._run_cmd_mysql_options,
|
|
'-e', f'source {config._temp_sql_file.name}'
|
|
)
|
|
if stderr:
|
|
error(f'could not apply schema changes: {stderr.decode()}')
|
|
shutdown(1)
|
|
config.db_conn.commit()
|
|
config._script_was_applied = True
|
|
|
|
post_set_release_for_applied_revisions()
|
|
except RevisionApplyError:
|
|
post_set_release_for_applied_revisions()
|
|
shutdown(1)
|
|
|
|
if not config._script_was_applied:
|
|
log('nothing to apply')
|
|
|
|
|
|
def apply_up_script(script_name: str, revision: int) -> int:
|
|
"""Applies .up script.
|
|
|
|
Performs various checks to determins if the .up script
|
|
should be applied or skipped.
|
|
|
|
The script is skipped if:
|
|
- it is 'replicated' and already present in db_schema[revision]
|
|
- it is 'not_replicated' and already present in
|
|
db_schema[revision][site_id][node_name]
|
|
|
|
The script and further scripts are stopped if:
|
|
- config.to_revision > 0 and the current revision number
|
|
is greater than config.to_revision
|
|
|
|
Calls run_cmd() to apply the script file using 'mysql' consoel
|
|
|
|
If batch mode is enabled, instead of calling run_cmd() and insert
|
|
into the db_schema table applied revisions, all the data is accumulated
|
|
into config._temp_sql_file, to be then applied all at once in
|
|
apply_revisions() all at once
|
|
|
|
Upon success, insert a new entry into ngcp.db_schema
|
|
|
|
Args:
|
|
script_name (str): script name (excluding the base dir)
|
|
revision (int): revision number
|
|
|
|
Returns
|
|
int: 1 - if successfully applied or skipped,
|
|
0 - if should be stopped and future revisions will not
|
|
be applied
|
|
|
|
"""
|
|
node_name = config.node_name
|
|
db_schema = config.db_schema
|
|
|
|
not_replicated = 'not_replicated' in script_name
|
|
revision_in_db_schema = False
|
|
node_in_db_schema = False
|
|
site_id = int(config.node_roles.get('ngcp_site_id') or 1)
|
|
common_site_id = 0
|
|
|
|
node_roles = config.node_roles
|
|
is_db_role = node_roles.get('ngcp_is_db') is True
|
|
|
|
if revision in db_schema:
|
|
revision_in_db_schema = True
|
|
if site_id in db_schema[revision]:
|
|
if node_name in db_schema[revision][site_id]:
|
|
node_in_db_schema = True
|
|
if not node_in_db_schema and common_site_id in db_schema[revision]:
|
|
if node_name in db_schema[revision][common_site_id]:
|
|
node_in_db_schema = True
|
|
|
|
to_revision = getattr(config, 'to_revision', 0)
|
|
if to_revision > 0 and revision > config.to_revision:
|
|
log(f'stopping as next revision {revision} ' +
|
|
f'is greater than {config.to_revision}')
|
|
return 0
|
|
|
|
if revision_in_db_schema:
|
|
if not not_replicated or node_in_db_schema:
|
|
debug(f'skip already applied revision {script_name}')
|
|
return 1
|
|
|
|
script_filename = f'{config.db_scripts_dir}/{script_name}'
|
|
|
|
if not config.batch_mode:
|
|
log(f'applying revision {script_name}')
|
|
_, stderr = run_cmd(
|
|
'/usr/bin/mysql', *config._run_cmd_mysql_options,
|
|
'-e', f'source {script_filename}'
|
|
)
|
|
if stderr:
|
|
error(f'could not apply revision: {stderr.decode()}')
|
|
shutdown(1)
|
|
else:
|
|
log(f'adding revision {script_name}')
|
|
with open(script_filename, 'rb') as file:
|
|
config._temp_sql_file.write(file.read())
|
|
|
|
try:
|
|
config.db_conn.ping(reconnect=True)
|
|
if not config.batch_mode:
|
|
with config.db_conn.cursor() as cursor:
|
|
if is_db_role and check_column_exists('site_id'):
|
|
cursor.execute(f"""
|
|
INSERT INTO {config.db_schema_table}
|
|
(revision, node, site_id) VALUES (%s, %s, %s)
|
|
""", (revision, node_name, site_id))
|
|
else:
|
|
cursor.execute(f"""
|
|
INSERT INTO {config.db_schema_table}
|
|
(revision, node) VALUES (%s, %s)
|
|
""", (revision, node_name))
|
|
config.db_conn.commit()
|
|
config._script_was_applied = True
|
|
else:
|
|
sql = ''
|
|
if is_db_role and check_column_exists('site_id'):
|
|
sql = c_str(f"""
|
|
INSERT INTO {config.db_schema_table}
|
|
(revision, node)
|
|
VALUES
|
|
({revision}, '{node_name}')
|
|
""")
|
|
else:
|
|
sql = c_str(f"""
|
|
INSERT INTO {config.db_schema_table}
|
|
(revision, node, site_id)
|
|
VALUES
|
|
({revision}, '{node_name}', {site_id})
|
|
""")
|
|
config._temp_sql_file.write(
|
|
str(f'\n; {sql}; COMMIT;\n').encode()
|
|
)
|
|
config._post_set_release_for_revisions.append(revision)
|
|
|
|
if revision not in db_schema:
|
|
db_schema[revision] = {}
|
|
if site_id not in db_schema[revision]:
|
|
db_schema[revision][site_id] = []
|
|
|
|
db_schema[revision][site_id].append(node_name)
|
|
|
|
if not_replicated:
|
|
config._not_replicated_was_applied = True
|
|
except pymysql.Error as e:
|
|
error(f'could not apply revision: {e}')
|
|
raise RevisionApplyError(e)
|
|
return 1
|
|
|
|
|
|
def apply_down_script(script_name: str, revision: int) -> int:
|
|
"""Applies .down script.
|
|
|
|
Performs various checks to determins if the .down script
|
|
should be applied or skipped.
|
|
|
|
The script is skipped if:
|
|
- it is 'replicated' and not present in db_schema[revision]
|
|
- it is 'not_replicaed' and not present in
|
|
db_schema[revision][node_name]
|
|
|
|
The script and further scripts are stopped if:
|
|
- config.to_revision > 0 and to_revision <= config.to_revision
|
|
|
|
Calls run_cmd() to apply the script file using 'mysql' consoel
|
|
|
|
Upon success, removes the corresponding entry from ngcp.db_schema
|
|
|
|
If batch mode is enabled, instead of calling run_cmd() and insert
|
|
into the db_schema table applied revisions, all the data is accumulated
|
|
into config._temp_sql_file, to be then applied all at once in
|
|
apply_revisions() all at once
|
|
|
|
Note: down script 15800 is automatically skipped as it contains
|
|
addition of the 'release' table, which is crucial for downgrades.
|
|
|
|
Args:
|
|
script_name (str): script name (excluding the base dir)
|
|
revision (int): revision number
|
|
|
|
Returns
|
|
int: 1 - if successfully applied or skipped,
|
|
0 - if should be stopped and future revisions will not
|
|
be applied
|
|
|
|
"""
|
|
node_name = config.node_name
|
|
db_schema = config.db_schema
|
|
|
|
not_replicated = 'not_replicated' in script_name
|
|
revision_in_db_schema = False
|
|
node_in_db_schema = False
|
|
site_id = int(config.node_roles.get('ngcp_site_id') or 1)
|
|
common_site_id = 0
|
|
|
|
if revision in db_schema:
|
|
revision_in_db_schema = True
|
|
if site_id in db_schema[revision]:
|
|
if node_name in db_schema[revision][site_id]:
|
|
node_in_db_schema = True
|
|
if not node_in_db_schema and common_site_id in db_schema[revision]:
|
|
if node_name in db_schema[revision][common_site_id]:
|
|
node_in_db_schema = True
|
|
site_id = 0
|
|
|
|
to_revision = getattr(config, 'to_revision', 0)
|
|
if to_revision > 0 and revision <= config.to_revision:
|
|
log(f'stopping as next revision {revision} ' +
|
|
f'is lesser or equal to {config.to_revision}')
|
|
return 0
|
|
|
|
if revision == 15800:
|
|
log(f'skip non-removable revision {script_name}')
|
|
return 1
|
|
|
|
if not revision_in_db_schema or \
|
|
not_replicated and not node_in_db_schema:
|
|
debug(f'skip already removed revision {script_name}')
|
|
return 1
|
|
|
|
script_filename = f'{config.db_scripts_dir}/{script_name}'
|
|
|
|
if not config.batch_mode:
|
|
log(f'Removing revision {script_name}')
|
|
_, stderr = run_cmd(
|
|
'/usr/bin/mysql', *config._run_cmd_mysql_options,
|
|
'-e', f'source {script_filename}'
|
|
)
|
|
if stderr:
|
|
error(
|
|
f'could not remove revision {script_name}: {stderr.decode()}'
|
|
)
|
|
shutdown(1)
|
|
else:
|
|
log(f'adding revision {script_name}')
|
|
with open(script_filename, 'rb') as file:
|
|
config._temp_sql_file.write(file.read())
|
|
|
|
try:
|
|
config.db_conn.ping(reconnect=True)
|
|
if not config.batch_mode:
|
|
with config.db_conn.cursor() as cursor:
|
|
rows = 0
|
|
if not_replicated:
|
|
if site_id and check_column_exists('site_id'):
|
|
rows = cursor.execute(f"""
|
|
DELETE FROM {config.db_schema_table}
|
|
WHERE revision = %s
|
|
AND node = %s
|
|
AND site_id = %s
|
|
""", (revision, node_name, site_id))
|
|
else:
|
|
rows = cursor.execute(f"""
|
|
DELETE FROM {config.db_schema_table}
|
|
WHERE revision = %s
|
|
AND node = %s
|
|
""", (revision, node_name))
|
|
else:
|
|
rows = cursor.execute(f"""
|
|
DELETE FROM {config.db_schema_table}
|
|
WHERE revision = %s
|
|
""", revision)
|
|
if rows <= 0:
|
|
raise ValueError(script_name)
|
|
config.db_conn.commit()
|
|
config._script_was_applied = True
|
|
else:
|
|
sql = c_str(f"""
|
|
DELETE FROM {config.db_schema_table}
|
|
WHERE revision = {revision}
|
|
""")
|
|
if not_replicated:
|
|
sql += f" AND node = '{node_name}'"
|
|
if site_id and check_column_exists('site_id'):
|
|
sql += f" AND site_id = '{site_id}'"
|
|
|
|
config._temp_sql_file.write(
|
|
str(f'\n; {sql}; COMMIT;\n').encode()
|
|
)
|
|
|
|
if revision in db_schema:
|
|
if not_replicated:
|
|
db_schema[revision][site_id].remove(node_name)
|
|
if not db_schema[revision][site_id]:
|
|
del db_schema[revision][site_id]
|
|
else:
|
|
del db_schema[revision]
|
|
|
|
if not_replicated:
|
|
config._not_replicated_was_applied = True
|
|
except pymysql.Error as e:
|
|
error(f'could not apply revision {script_name}: {e}')
|
|
raise RevisionApplyError(e)
|
|
except ValueError as e:
|
|
error(f'could not remove revision {e}')
|
|
raise RevisionApplyError(e)
|
|
|
|
return 1
|
|
|
|
|
|
def post_set_release_for_applied_revisions() -> None:
|
|
"""Sets release column for applied revisions.
|
|
|
|
Sets release column for applied revisions
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
None
|
|
|
|
"""
|
|
if config.mode != 'up':
|
|
return
|
|
if not config._post_set_release_for_revisions:
|
|
return
|
|
if not check_column_exists('release'):
|
|
return
|
|
|
|
release = getattr(config, 'set_release', config.ngcp_version)
|
|
cursor: pymysql.cursors.Cursor = config.db_conn.cursor()
|
|
|
|
cursor.execute(f"""
|
|
UPDATE {config.db_schema_table}
|
|
SET `release` = '{release}',
|
|
applied_at = applied_at
|
|
WHERE revision IN ({
|
|
','.join(str(x) for x in config._post_set_release_for_revisions)
|
|
})
|
|
""")
|
|
|
|
cursor.close()
|
|
config.db_conn.commit()
|
|
|
|
|
|
def sync_ro_db() -> None:
|
|
"""Synchronises R/O database.
|
|
|
|
This function is called after successful database upgrade/downgrade
|
|
operations with the .up/.down scripts
|
|
|
|
It calls ngcp-sync-db script with specific options to re-establish
|
|
master-slave replication from the central db* nodes
|
|
|
|
It is only called on:
|
|
- carrier platform
|
|
- proxy nodes
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
None
|
|
|
|
"""
|
|
node_roles = config.node_roles
|
|
ngcp_type = node_roles['ngcp_type']
|
|
is_proxy_role = node_roles.get('ngcp_is_proxy') is True
|
|
|
|
if ngcp_type != 'carrier':
|
|
debug('skip sync R/O database on non "carrier" nodes')
|
|
return
|
|
|
|
if not is_proxy_role:
|
|
debug(c_str("""
|
|
skip R/O database sync because this node
|
|
does not have the 'proxy' role.
|
|
"""))
|
|
return
|
|
|
|
if getattr(config, 'skip_ro_db_sync', False):
|
|
debug(c_str("""
|
|
skip sync R/O database preparation as
|
|
'skip_ro_db_sync' option is provided
|
|
"""))
|
|
return
|
|
|
|
if getattr(config, 'force_ro_db_sync', False):
|
|
debug(c_str("""
|
|
force sync R/O database preparation as
|
|
'force_ro_db_sync' option is provided
|
|
"""))
|
|
elif config._not_replicated_was_applied:
|
|
debug(c_str("""
|
|
at least one not_replicated script was applied,
|
|
preparing to sync R/O database.'
|
|
"""))
|
|
else:
|
|
debug(c_str("""
|
|
skip sync R/O database preparation as
|
|
'not_replicated' scripts were not applied
|
|
"""))
|
|
return
|
|
|
|
log('preparing RO db sync...')
|
|
|
|
sync_dbs_opt = config.sync_db_databases
|
|
ignore_tables_opt = config.sync_db_ignore_tables
|
|
|
|
db_ro_socket = config.db_ro_socket
|
|
db_defaults_conf_file = config.db_defaults_conf_file
|
|
|
|
extra_sync_opt = []
|
|
if config.automated:
|
|
extra_sync_opt += ['--set-local-grants']
|
|
|
|
db_user = ''
|
|
db_pass = ''
|
|
try:
|
|
with open(db_defaults_conf_file) as file:
|
|
for line in file.readlines():
|
|
m = re.match(r'^user\s+=\s+(\S+)', line)
|
|
if m:
|
|
db_user = m.group(1)
|
|
m = re.match(r'^password\s+=\s+(\S+)', line)
|
|
if m:
|
|
db_pass = m.group(1)
|
|
except FileNotFoundError as e:
|
|
error(f'cannot access {db_defaults_conf_file}: {e}')
|
|
shutdown(1)
|
|
|
|
socket_available = os.access(str(db_ro_socket), os.R_OK)
|
|
if not socket_available:
|
|
error(f'socket {db_ro_socket} is not available')
|
|
shutdown(1)
|
|
|
|
if not (db_user and db_pass):
|
|
error(f'could not fetch credentials from {db_defaults_conf_file}')
|
|
shutdown(1)
|
|
|
|
debug(f'set privileges for user={db_user} socket={db_ro_socket}')
|
|
try:
|
|
run_cmd_mysql_options = ['-u', 'root', '-S', db_ro_socket]
|
|
_, stderr = run_cmd(
|
|
'/usr/bin/mysql', *run_cmd_mysql_options,
|
|
'-e', c_str(f"""
|
|
grant all privileges
|
|
on *.*
|
|
to '{db_user}'@'localhost' identified by '{db_pass}'
|
|
with grant option;
|
|
flush privileges;
|
|
""")
|
|
)
|
|
if stderr:
|
|
raise ChildProcessError(stderr.decode())
|
|
except ChildProcessError as e:
|
|
error(f'could not set privileges for user={db_user}: {e}')
|
|
shutdown(1)
|
|
|
|
master_node_pref = ''
|
|
if config.ngcp_upgrade:
|
|
central_dbhost = config.node_dbconf['central_dbhost']
|
|
if central_dbhost == 'db01':
|
|
node_side = ''
|
|
is_a_side = re.search(r'\d{2}a$', config.node_name)
|
|
is_b_side = re.search(r'\d{2}b$', config.node_name)
|
|
if is_a_side or config.node_name == 'sp1':
|
|
node_side = 'a'
|
|
elif is_b_side or config.node_name == 'sp2':
|
|
node_side = 'b'
|
|
else:
|
|
error('could not determine valid node side during upgrade')
|
|
shutdown(1)
|
|
|
|
master_node_pref = f'{central_dbhost}{node_side}'
|
|
elif central_dbhost in ['db01a', 'db01b']:
|
|
master_node_pref = str(central_dbhost)
|
|
|
|
if master_node_pref:
|
|
extra_sync_opt += ['--master-host', master_node_pref]
|
|
else:
|
|
error(c_str("""
|
|
could not determine valid master db node during upgrade
|
|
expected: db01, db01a, db01b
|
|
"""))
|
|
shutdown(1)
|
|
|
|
local_dbhost = config.node_dbconf['local_dbhost']
|
|
local_dbport = config.node_dbconf['local_dbport']
|
|
|
|
cmd: List[str | int] = [config.ngcp_sync_db_script,
|
|
'--verbose',
|
|
'--force',
|
|
'--use-central-db',
|
|
'--repl-mode', 'master-slave',
|
|
'--databases', sync_dbs_opt,
|
|
'--ignore-tables', ignore_tables_opt,
|
|
'--ssh-tunnel', '33125',
|
|
*extra_sync_opt,
|
|
'--local-host', local_dbhost,
|
|
'--local-port', local_dbport]
|
|
try:
|
|
debug(f'run_cmd as: {cmd}')
|
|
log('running ngcp-sync-db...')
|
|
stdout, stderr = run_cmd(*tuple(cmd))
|
|
log(stdout.decode())
|
|
if stderr:
|
|
raise ChildProcessError(stderr.decode())
|
|
except ChildProcessError as e:
|
|
error(f'could not complete ngcp-sync-db: {e}')
|
|
shutdown(1)
|
|
|
|
|
|
def check_active_node() -> None:
|
|
"""Checks if the current node is active.
|
|
|
|
Stops the script if the current node is active and the --force
|
|
argument is not provided
|
|
This function always succeeds on CE platforms
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
None
|
|
|
|
"""
|
|
ngcp_type = config.node_roles['ngcp_type']
|
|
|
|
if ngcp_type == 'spce':
|
|
return
|
|
|
|
node_state = get_node_state()
|
|
|
|
if node_state != 'standby':
|
|
if not config.force:
|
|
log(c_str(f"""
|
|
stopping because node {config.node_name} state is {node_state}
|
|
"""))
|
|
shutdown(1)
|
|
else:
|
|
debug(c_str(
|
|
f"""current node {config.node_name} state is {node_state}
|
|
but force mode is enabled."""
|
|
))
|
|
|
|
|
|
def check_and_show_prompt() -> None:
|
|
"""Checks and shows the db init agreement prompt.
|
|
|
|
If the db_schema is not yet initialised and --automated mode
|
|
is not set, a prompt is shown that expects 'agree' confirmation
|
|
from the user, otherwise the script is aborted
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
None
|
|
|
|
"""
|
|
if config.mode == 'down' and not config.db_schema:
|
|
shutdown(1)
|
|
|
|
if not config.db_schema and not config.automated:
|
|
prompt = """
|
|
=================================================================
|
|
Warning: the db_schema table of the ngcp database is empty.
|
|
Are you sure you want to proceed with applying db-schema changes ?
|
|
This will DROP and then re-initialize your existing database.
|
|
Please type 'agree' and press Enter to really continue: """
|
|
prompt = prompt.strip()
|
|
prompt = '\n'.join([line.strip() for line in prompt.splitlines()])
|
|
prompt += ' '
|
|
config._prompt_is_shown = True
|
|
response = input(prompt)
|
|
if response != 'agree':
|
|
log('Aborted.')
|
|
shutdown(0)
|
|
|
|
|
|
def show_release_info() -> None:
|
|
"""Shows releases info.
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
None
|
|
|
|
"""
|
|
if not check_column_exists('release'):
|
|
error('Cannot use get release info on the old database version')
|
|
shutdown(1)
|
|
|
|
cursor: pymysql.cursors.Cursor = config.db_conn.cursor()
|
|
|
|
cursor.execute(f"""
|
|
SELECT `release`, min(revision), max(revision), COUNT(revision)
|
|
FROM {config.db_schema_table}
|
|
GROUP BY 1
|
|
""")
|
|
|
|
if config.as_json:
|
|
data: Dict[str, Dict[str, int]] = {}
|
|
for row in cursor:
|
|
release = row[0]
|
|
min_rev = row[1]
|
|
max_rev = row[2]
|
|
count = row[3]
|
|
data[release] = {
|
|
'min_revision': min_rev,
|
|
'max_revision': max_rev,
|
|
'count': count
|
|
}
|
|
print(json.dumps(data, sort_keys=True))
|
|
else:
|
|
table = PrettyTable(['release',
|
|
'min_revision', 'max_revision',
|
|
'count'])
|
|
table.align = 'r'
|
|
for row in cursor:
|
|
table.add_row(row)
|
|
print(table)
|
|
|
|
|
|
def show_start_summary() -> None:
|
|
"""Print options that the script is invoked with.
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
None
|
|
|
|
"""
|
|
log(f'{os.path.basename(sys.argv[0])} started with the following options:')
|
|
log('-----------------------------------')
|
|
for arg in config.supported_args:
|
|
if hasattr(config, arg):
|
|
v = getattr(config, arg)
|
|
log(f'{arg}: {v}')
|
|
log('-----------------------------------')
|
|
log('node_name: ' + config.node_name)
|
|
node_roles = config.node_roles
|
|
if node_roles.get('ngcp_is_multi_site'):
|
|
site_id = str(node_roles.get('ngcp_site_id') or 1)
|
|
log('site_id: ' + site_id)
|
|
log('node_state: ' + config.node_state)
|
|
log('ngcp_version: ' + config.ngcp_version)
|
|
log('-----------------------------------')
|
|
|
|
|
|
def shutdown(retcode: int) -> None:
|
|
"""Called when the script needs to be manually stopped.
|
|
|
|
Args:
|
|
ret (int): return code, 0 - ok
|
|
1 - error
|
|
|
|
Returns
|
|
None
|
|
|
|
"""
|
|
sys.exit(retcode)
|
|
|
|
|
|
def main() -> None:
|
|
"""Main entry function.
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns
|
|
None
|
|
|
|
"""
|
|
try:
|
|
started_at = time.time()
|
|
global config
|
|
config = Config()
|
|
parse_args()
|
|
config.setup_config()
|
|
if config.get_release_info:
|
|
config.setup_db()
|
|
show_release_info()
|
|
else:
|
|
show_start_summary()
|
|
config.setup_db()
|
|
check_active_node()
|
|
check_and_show_prompt()
|
|
revisions = get_revisions()
|
|
apply_revisions(revisions)
|
|
sync_ro_db()
|
|
log('done in %.2fs.' % (time.time()-started_at))
|
|
shutdown(0)
|
|
except RuntimeError as err: # noqa: B902
|
|
error(str(err))
|
|
shutdown(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|