MT#62899 Unify python coding style

- Add pyproject.toml and tox.ini files.
- Run black on Debian trixie.

Change-Id: Ifeef06e15d6b6a63fb7581487ceecf22fd646e9f
master
Guillem Jover 4 months ago
parent f6f1c9addb
commit c019cb15b0

@ -34,23 +34,22 @@ except ImportError:
class Test:
""" Class to create TAP output """
"""Class to create TAP output"""
def __init__(self):
self._step = []
self._errflag = False
def comment(self, msg):
""" Add a comment """
"""Add a comment"""
self._step.append({"result": None, "msg_ok": msg})
def ok(self, msg=None):
""" Add a ok result """
"""Add a ok result"""
self._step.append({"result": True, "msg_ok": msg})
def error(self, msg_err):
""" Add an error result"""
"""Add an error result"""
self._step.append({"result": False, "msg_err": msg_err})
self._errflag = True
@ -87,7 +86,7 @@ class Test:
return result
def test(self, value_expected, value, msg_err, msg_ok=None):
""" Test two values and add the result"""
"""Test two values and add the result"""
result = Test.compare(value_expected, value)
self._step.append(
{"result": result, "msg_err": msg_err, "msg_ok": msg_ok}
@ -165,9 +164,7 @@ def check_cdr_recursive(scen, msgs, test):
found = False
for message in msgs:
test.comment("comparing with cdr id {}".format(message["id"]))
valid, comments, oks = check_single_cdr_recursive(
scen[i], message
)
valid, comments, oks = check_single_cdr_recursive(scen[i], message)
for msg in comments:
test.comment(msg)
# if expected and result CDRs fully matches -> exit loop
@ -256,19 +253,11 @@ def main(args):
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="generate TAP result for CDR"
)
parser = argparse.ArgumentParser(description="generate TAP result for CDR")
grp = parser.add_mutually_exclusive_group()
grp.add_argument(
"-y", "--yaml", action="store_true", help="YAML cdr_file"
)
grp.add_argument(
"-j", "--json", action="store_true", help="JSON cdr_file"
)
grp.add_argument(
"-t", "--text", action="store_true", help="TEXT cdr_file"
)
grp.add_argument("-y", "--yaml", action="store_true", help="YAML cdr_file")
grp.add_argument("-j", "--json", action="store_true", help="JSON cdr_file")
grp.add_argument("-t", "--text", action="store_true", help="TEXT cdr_file")
parser.add_argument("yml_file", help="YAML file with checks")
parser.add_argument("cdr_file", help="CDR file")
parser.add_argument("-d", "--debug", action="store_true")

@ -35,7 +35,6 @@ except ImportError:
class XAvp:
"""Class to simulate the xavp"""
def __init__(self, name, data):
@ -111,7 +110,6 @@ class Section(Flag):
class CTest:
"""Class to create TAP output"""
def __init__(self):

@ -40,23 +40,22 @@ class Section(Flag):
class Test:
""" Class to create TAP output """
"""Class to create TAP output"""
def __init__(self):
self._step = []
self._errflag = Section(0)
def comment(self, msg):
""" Add a comment """
"""Add a comment"""
self._step.append({"result": None, "msg_ok": msg})
def ok(self, msg=None):
""" Add a ok result """
"""Add a ok result"""
self._step.append({"result": True, "msg_ok": msg})
def error(self, section, msg_err):
""" Add an error result"""
"""Add an error result"""
self._step.append({"result": False, "msg_err": msg_err})
self._errflag |= section
@ -92,7 +91,7 @@ class Test:
return result
def test(self, section, value_expected, value, msg_err, msg_ok=None):
""" Test two values and add the result"""
"""Test two values and add the result"""
result = Test.compare(value_expected, value)
val = {"result": result, "msg_err": msg_err, "msg_ok": msg_ok}
self._step.append(val)

@ -24,7 +24,7 @@ import os
import sys
import xmlrpc.client
KAM_URL = 'http://127.0.0.1:5062'
KAM_URL = "http://127.0.0.1:5062"
KAM_LINES = 10
proxy = xmlrpc.client.ServerProxy(KAM_URL)
@ -38,25 +38,26 @@ def get_headers(line, prefix):
def sum_row(s, r):
for i in ['used', 'real_used', 'free']:
for i in ["used", "real_used", "free"]:
s[i] = s[i] + r[i]
def save_data(datafile, headers, prefix, res):
show_headers = get_headers(headers, prefix)
with open(datafile, 'w+') as csvfile:
with open(datafile, "w+") as csvfile:
spamwriter = csv.writer(csvfile)
spamwriter.writerow(show_headers)
spamwriter = csv.DictWriter(csvfile,
headers, extrasaction='ignore')
spamwriter = csv.DictWriter(csvfile, headers, extrasaction="ignore")
for row in res:
spamwriter.writerow(row)
def get_pvm(private_file):
res = [{'pid': 'SUM', 'used': 0, 'real_used': 0, 'free': 0}, ]
headers = ['pid', 'used', 'real_used', 'free']
prefix = 'rss_'
res = [
{"pid": "SUM", "used": 0, "real_used": 0, "free": 0},
]
headers = ["pid", "used", "real_used", "free"]
prefix = "rss_"
try:
for i in range(KAM_LINES):
row = proxy.pkg.stats("index", i)[0]
@ -76,17 +77,15 @@ def get_pvm(private_file):
sys.exit(-2)
save_data(private_file, headers, prefix, res[:1])
fileName, fileExtension = os.path.splitext(private_file)
private_file_pp = '%s_per_pid%s' % (fileName, fileExtension)
private_file_pp = "%s_per_pid%s" % (fileName, fileExtension)
save_data(private_file_pp, headers, prefix, res[2:])
def get_shm(share_file):
headers = ['used', 'real_used', 'max_used',
'free', 'fragments', 'total'
]
prefix = 'shared_'
headers = ["used", "real_used", "max_used", "free", "fragments", "total"]
prefix = "shared_"
try:
res = proxy.core.shmmem('b')
res = proxy.core.shmmem("b")
except xmlrpc.client.Fault as err:
print("A fault occurred")
print("Fault code: %d" % err.faultCode)
@ -99,18 +98,40 @@ def get_shm(share_file):
print("Error code: %d" % err.errcode)
print("Error message: %s" % err.errmsg)
sys.exit(-2)
save_data(share_file, headers, prefix, [res, ])
save_data(
share_file,
headers,
prefix,
[
res,
],
)
def main():
parser = argparse.ArgumentParser(
description='export to csv kamailio proxy stats.')
parser.add_argument('--private_file', '-P', nargs='?', default='pvm.csv',
help='path to the private csv file')
parser.add_argument('--share_file', '-S', nargs='?', default='shm.csv',
help='path to the share csv file')
parser.add_argument('--rpc_url', nargs='?', default=KAM_URL,
help='rpc URL of kamailio. Default:%s' % KAM_URL)
description="export to csv kamailio proxy stats."
)
parser.add_argument(
"--private_file",
"-P",
nargs="?",
default="pvm.csv",
help="path to the private csv file",
)
parser.add_argument(
"--share_file",
"-S",
nargs="?",
default="shm.csv",
help="path to the share csv file",
)
parser.add_argument(
"--rpc_url",
nargs="?",
default=KAM_URL,
help="rpc URL of kamailio. Default:%s" % KAM_URL,
)
args = parser.parse_args()
get_pvm(args.private_file)

@ -28,7 +28,6 @@ import sys
import getopt
from scapy.all import PcapReader, wrpcap, Packet, NoPayload
output_file = None
input_files = []
complete_diff = False
@ -52,7 +51,8 @@ show_diffs = False
def usage():
print(sys.argv[0])
print("""
print(
"""
-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
Diff two or more pcap files
Programmed by Bastian Ballmann <bastian.ballmann@inf.ethz.ch>
@ -83,7 +83,8 @@ def usage():
Example usage with ignore mac addresses
pcap_diff.py -i client.dump -i server.dump -o diff.pcap -f m
""")
"""
)
sys.exit(1)
@ -143,7 +144,7 @@ if len(input_files) < 2:
sys.exit(1)
def flatten(d, parent_key=''):
def flatten(d, parent_key=""):
"""
Flatten a packet to a dict
Remove checksums (can be different due to calculation in netdev firmware)
@ -151,17 +152,26 @@ def flatten(d, parent_key=''):
items = []
# skip scapy internal fields
skip_fields = ['fieldtype', 'underlayer', 'initialized', 'fieldtype',
'default_fields', 'aliastypes', 'post_transforms',
'packetfields', 'overloaded_fields', 'sent_time']
hasPayload = 'payload' in d
skip_fields = [
"fieldtype",
"underlayer",
"initialized",
"fieldtype",
"default_fields",
"aliastypes",
"post_transforms",
"packetfields",
"overloaded_fields",
"sent_time",
]
hasPayload = "payload" in d
for k, v in d.items():
fullk = "%s_%s" % (parent_key, k)
# ignore the original if we have payload.The payload will be expanded
if hasPayload and k == 'original':
if hasPayload and k == "original":
continue
# No complete diff? Ignore checksum, ttl and time
@ -204,7 +214,7 @@ def flatten(d, parent_key=''):
if fullk in ignore_headers:
continue
new_key = parent_key + '_' + k if parent_key else k
new_key = parent_key + "_" + k if parent_key else k
# payload is Packet or str
# stop at NoPayload payload

@ -1,22 +1,22 @@
#!/usr/bin/python3
"""
Copyright: 2014 Sipwise Development Team <support@sipwise.com>
Copyright: 2014 Sipwise Development Team <support@sipwise.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This package is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
This package is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
On Debian systems, the complete text of the GNU General
Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
On Debian systems, the complete text of the GNU General
Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
"""
import logging
import os
@ -27,11 +27,12 @@ import argparse
import signal
BASE_DIR = "/usr/share/kamailio-config-tests"
if 'BASE_DIR' in os.environ:
BASE_DIR = os.environ['BASE_DIR']
filelog = os.path.join(BASE_DIR, 'log', 'pid_watcher.log')
if "BASE_DIR" in os.environ:
BASE_DIR = os.environ["BASE_DIR"]
filelog = os.path.join(BASE_DIR, "log", "pid_watcher.log")
logging.basicConfig(
filename=filelog, level=logging.DEBUG, format='%(asctime)s %(message)s')
filename=filelog, level=logging.DEBUG, format="%(asctime)s %(message)s"
)
base_path = "/run"
@ -39,17 +40,13 @@ base_path = "/run"
files we want to watch
"""
services = [
'kamailio/kamailio.lb.pid',
'kamailio/kamailio.proxy.pid',
'sems-b2b/sems-b2b.pid',
'ngcp-witnessd.pid'
"kamailio/kamailio.lb.pid",
"kamailio/kamailio.proxy.pid",
"sems-b2b/sems-b2b.pid",
"ngcp-witnessd.pid",
]
watched_dirs = [
'/run/kamailio',
'/run/sems-b2b',
'/run'
]
watched_dirs = ["/run/kamailio", "/run/sems-b2b", "/run"]
watched = {}
@ -71,28 +68,30 @@ class Handler(pyinotify.ProcessEvent):
def check_all(self):
all = True
for k, v in self.watched.items():
all = (all and (v['created'] or v['modified']))
all = all and (v["created"] or v["modified"])
logging.info("checking: %s[%s] all:%s" % (k, v, all))
return all
def process_IN_CREATE(self, event):
if event.pathname in watched:
watched[event.pathname]['created'] = True
watched[event.pathname]["created"] = True
logging.info("created %s" % event.pathname)
if self.check_all():
self.check_done()
def process_IN_IGNORED(self, event):
if event.pathname in watched:
watched[event.pathname]['deleted'] = True
watched[event.pathname]["deleted"] = True
logging.info("deleted %s" % event.pathname)
def process_IN_MODIFY(self, event):
if event.pathname in watched:
watched[event.pathname]['modified'] = True
watched[event.pathname]["modified"] = True
logging.info("modified %s" % event.pathname)
if self.check_all():
self.check_done()
# for debug
# def process_default(self, event):
# if watched.has_key(event.pathname):
@ -103,18 +102,20 @@ class Handler(pyinotify.ProcessEvent):
signal.signal(signal.SIGTERM, sigterm_handler)
parser = argparse.ArgumentParser(
description='watch some pids to detect restarts')
parser.add_argument('--pbx', dest='pbx', action='store_true',
help='pbx is enabled')
description="watch some pids to detect restarts"
)
parser.add_argument(
"--pbx", dest="pbx", action="store_true", help="pbx is enabled"
)
args = parser.parse_args()
if args.pbx:
watched_dirs += [
'/run/fastcgi',
"/run/fastcgi",
]
services += [
'fastcgi/ngcp-panel.pid',
"fastcgi/ngcp-panel.pid",
]
logging.info("PID watcher started")
@ -125,7 +126,10 @@ for service in services:
service_pid = os.path.join(base_path, service)
logging.info("Watching %s" % service_pid)
watched[service_pid] = {
'deleted': False, 'created': False, 'modified': False}
"deleted": False,
"created": False,
"modified": False,
}
wm.add_watch(service_pid, pyinotify.IN_IGNORED)
for d in watched_dirs:
wm.add_watch(d, pyinotify.ALL_EVENTS)

@ -1,6 +1,10 @@
[tool.black]
target-version = ['py37']
# Match line-length expected by pycodestyle.
line-length = 79
# Current version in Debian trixie.
target-version = ['py313']
# Enable preview features, expected by pycodestyle.
preview = true
include = '\.pyi?$'
exclude = '''
/(

@ -1,2 +1,3 @@
import pytest
pytest_plugins = ['fixtures.programs']
pytest_plugins = ["fixtures.programs"]

@ -23,7 +23,6 @@ import sys
import logging
from collections import namedtuple
lib_path = os.path.abspath("bin")
sys.path.append(lib_path)

@ -32,11 +32,10 @@ except ImportError:
class ParametrizedTestCase(unittest.TestCase):
""" TestCase classes that want to be parametrized should
inherit from this class.
http://eli.thegreenplace.net/
2011/08/02/python-unit-testing-parametrized-test-cases
"""TestCase classes that want to be parametrized should
inherit from this class.
http://eli.thegreenplace.net/
2011/08/02/python-unit-testing-parametrized-test-cases
"""
def __init__(self, methodName="runTest", param=None):
@ -52,8 +51,8 @@ class ParametrizedTestCase(unittest.TestCase):
@staticmethod
def parametrize(testcase_klass, param=None):
""" Create a suite containing all tests taken from the given
subclass, passing them the parameter 'param'.
"""Create a suite containing all tests taken from the given
subclass, passing them the parameter 'param'.
"""
testloader = unittest.TestLoader()
testnames = testloader.getTestCaseNames(testcase_klass)

@ -0,0 +1,2 @@
[pycodestyle]
max-line-length = 79
Loading…
Cancel
Save