Change-Id: Ib80102130ad943d89253dc5682614080cb4b2beechanges/76/25676/10
parent
a6619bfb18
commit
a50af1ca1f
@ -0,0 +1,160 @@
|
||||
package NGCP::BulkProcessor::FileProcessors::XslxFileSimple;
|
||||
use strict;
|
||||
|
||||
## no critic
|
||||
|
||||
use Excel::Reader::XLSX; qw();
|
||||
|
||||
use NGCP::BulkProcessor::Logging qw(
|
||||
getlogger
|
||||
fileprocessingstarted
|
||||
fileprocessingdone
|
||||
lines_read
|
||||
processing_lines
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::LogError qw(
|
||||
fileprocessingfailed
|
||||
fileerror
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::Utils qw(threadid);
|
||||
|
||||
use NGCP::BulkProcessor::FileProcessor qw(create_process_context);
|
||||
|
||||
require Exporter;
|
||||
our @ISA = qw(Exporter NGCP::BulkProcessor::FileProcessor);
|
||||
our @EXPORT_OK = qw();
|
||||
|
||||
my $default_sheet_name = undef; #'~';
|
||||
my $blocksize = 100;
|
||||
|
||||
sub new {
|
||||
|
||||
my $class = shift;
|
||||
|
||||
my $self = NGCP::BulkProcessor::FileProcessor->new(@_);
|
||||
|
||||
#$self->{numofthreads} = shift // $default_numofthreads;
|
||||
$self->{custom_formats} = shift;
|
||||
$self->{sheet_name} = shift // $default_sheet_name;
|
||||
$self->{header_row} = shift // 0;
|
||||
$self->{blocksize} = $blocksize;
|
||||
|
||||
bless($self,$class);
|
||||
|
||||
return $self;
|
||||
|
||||
}
|
||||
|
||||
sub process {
|
||||
|
||||
my $self = shift;
|
||||
|
||||
my %params = @_;
|
||||
my ($file,
|
||||
$process_code,
|
||||
$static_context,
|
||||
$init_process_context_code,
|
||||
$uninit_process_context_code) = @params{qw/
|
||||
file
|
||||
process_code
|
||||
static_context
|
||||
init_process_context_code
|
||||
uninit_process_context_code
|
||||
/};
|
||||
|
||||
fileprocessingstarted($file,getlogger(__PACKAGE__));
|
||||
my $result = 0;
|
||||
my $tid = threadid();
|
||||
my $context = create_process_context($static_context,{ instance => $self,
|
||||
filename => $file,
|
||||
tid => $tid,
|
||||
});
|
||||
eval {
|
||||
my $reader = Excel::Reader::XLSX->new();
|
||||
my $workbook = $reader->read_file($file);
|
||||
#my $workbook = Spreadsheet::Reader::ExcelXML->new($file);
|
||||
# file => $file,
|
||||
# #group_return_type => 'value',
|
||||
# count_from_zero => 0,
|
||||
# values_only => 1,
|
||||
# empty_is_end => 1,
|
||||
# group_return_type => ('HASH' eq ref $self->{custom_formats} ? 'value' : 'xml_value'),
|
||||
# from_the_edge => 0,
|
||||
# empty_return_type => 'undef_string',
|
||||
# spaces_are_empty => 1,
|
||||
# merge_data => 0,
|
||||
# column_formats => 0,
|
||||
#);
|
||||
if (defined $init_process_context_code and 'CODE' eq ref $init_process_context_code) {
|
||||
&$init_process_context_code($context);
|
||||
}
|
||||
if (not defined $workbook) {
|
||||
fileerror('processing file - error reading file ' . $file . ': ' . $reader->error(),getlogger(__PACKAGE__));
|
||||
} else {
|
||||
my $sheet;
|
||||
if ($self->{sheet_name}) {
|
||||
$sheet = $workbook->worksheet($self->{sheet_name});
|
||||
#xls2csvinfo('converting the ' . $sheet->name() . ' worksheet',getlogger(__PACKAGE__));
|
||||
} else {
|
||||
$sheet = $workbook->worksheet(0);
|
||||
#if (@{$workbook->worksheets()} > 1) {
|
||||
# xls2csvinfo('multiple worksheets found, converting ' . $sheet->name(),getlogger(__PACKAGE__));
|
||||
#}
|
||||
}
|
||||
if (not defined $sheet) {
|
||||
#fileerror('processing file - error reading file ' . $file . ': ' . $workbook->error(),getlogger(__PACKAGE__));
|
||||
fileerror('invalid spreadsheet',getlogger(__PACKAGE__));
|
||||
} else {
|
||||
$result = 1;
|
||||
#_info($context,"worksheet '" . $worksheet->get_name() . "' opened");
|
||||
|
||||
#$worksheet->set_custom_formats($self->{custom_formats}) if 'HASH' eq ref $self->{custom_formats};
|
||||
#$worksheet->set_custom_formats({
|
||||
# 2 =>'yyyy-mm-dd',
|
||||
#});
|
||||
#$worksheet->set_headers($self->{header_row}) if defined $self->{header_row};
|
||||
#if ($worksheet->header_row_set()) {
|
||||
# $worksheet->go_to_or_past_row($worksheet->get_excel_position($worksheet->get_last_header_row()));
|
||||
#}
|
||||
|
||||
my $i = 0;
|
||||
processing_lines($tid,$i,$self->{blocksize},undef,getlogger(__PACKAGE__));
|
||||
#my $value;
|
||||
my @rows = ();
|
||||
while ($result) {
|
||||
#$value = $worksheet->fetchrow_arrayref;
|
||||
my $row = $sheet->next_row();
|
||||
last unless $row; #if (not $value or 'EOF' eq $value);
|
||||
my @vals = $row->values();
|
||||
#$i++;
|
||||
#next if not ref $value;
|
||||
push(@rows,\@vals);
|
||||
if ((scalar @rows) >= $self->{blocksize}) {
|
||||
$result &= &$process_code($context,\@rows,$i);
|
||||
$i += scalar @rows;
|
||||
processing_lines($tid,$i,$self->{blocksize},undef,getlogger(__PACKAGE__));
|
||||
@rows = ();
|
||||
}
|
||||
}
|
||||
$result &= &$process_code($context,\@rows,$i);
|
||||
}
|
||||
}
|
||||
};
|
||||
$result &= 0 if $@;
|
||||
eval {
|
||||
if (defined $uninit_process_context_code and 'CODE' eq ref $uninit_process_context_code) {
|
||||
&$uninit_process_context_code($context);
|
||||
}
|
||||
};
|
||||
if ($result) {
|
||||
fileprocessingdone($file,getlogger(__PACKAGE__));
|
||||
} else {
|
||||
fileprocessingfailed($file,getlogger(__PACKAGE__));
|
||||
}
|
||||
return $result;
|
||||
|
||||
}
|
||||
|
||||
1;
|
||||
@ -0,0 +1,359 @@
|
||||
package NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber;
|
||||
use strict;
|
||||
|
||||
## no critic
|
||||
|
||||
use NGCP::BulkProcessor::Projects::Migration::UPCAT::ProjectConnectorPool qw(
|
||||
get_import_db
|
||||
destroy_all_dbs
|
||||
);
|
||||
#import_db_tableidentifier
|
||||
|
||||
use NGCP::BulkProcessor::SqlProcessor qw(
|
||||
registertableinfo
|
||||
create_targettable
|
||||
checktableinfo
|
||||
copy_row
|
||||
|
||||
insert_stmt
|
||||
|
||||
process_table
|
||||
);
|
||||
use NGCP::BulkProcessor::SqlRecord qw();
|
||||
|
||||
#use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber qw();
|
||||
|
||||
require Exporter;
|
||||
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
|
||||
our @EXPORT_OK = qw(
|
||||
create_table
|
||||
gettablename
|
||||
check_table
|
||||
getinsertstatement
|
||||
getupsertstatement
|
||||
|
||||
findby_service_number
|
||||
countby_service_number
|
||||
|
||||
findby_switch_number
|
||||
|
||||
update_delta
|
||||
findby_delta
|
||||
countby_delta
|
||||
|
||||
$deleted_delta
|
||||
$updated_delta
|
||||
$added_delta
|
||||
|
||||
process_records
|
||||
|
||||
@fieldnames
|
||||
);
|
||||
|
||||
#findby_ccacsn
|
||||
#countby_ccacsn
|
||||
|
||||
#findby_domain_sipusername
|
||||
#findby_domain_webusername
|
||||
#list_domain_billingprofilename_resellernames
|
||||
#findby_sipusername
|
||||
#list_barring_resellernames
|
||||
|
||||
my $tablename = 'ccs_subscriber';
|
||||
my $get_db = \&get_import_db;
|
||||
|
||||
our @fieldnames = (
|
||||
"service_number",
|
||||
"switch_number",
|
||||
"icm",
|
||||
"routing_type",
|
||||
"customer",
|
||||
"target_number",
|
||||
"comment",
|
||||
|
||||
'rownum',
|
||||
);
|
||||
my $expected_fieldnames = [
|
||||
@fieldnames,
|
||||
'delta',
|
||||
];
|
||||
|
||||
# table creation:
|
||||
my $primarykey_fieldnames = [ 'service_number' ];
|
||||
my $indexes = {
|
||||
|
||||
$tablename . '_switch_number' => [ 'switch_number(12)' ],
|
||||
|
||||
$tablename . '_rownum' => [ 'rownum(11)' ],
|
||||
$tablename . '_delta' => [ 'delta(7)' ],};
|
||||
#my $fixtable_statements = [];
|
||||
|
||||
our $deleted_delta = 'DELETED';
|
||||
our $updated_delta = 'UPDATED';
|
||||
our $added_delta = 'ADDED';
|
||||
|
||||
sub new {
|
||||
|
||||
my $class = shift;
|
||||
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
|
||||
$tablename,$expected_fieldnames,$indexes);
|
||||
|
||||
copy_row($self,shift,$expected_fieldnames);
|
||||
|
||||
return $self;
|
||||
|
||||
}
|
||||
|
||||
sub create_table {
|
||||
|
||||
my ($truncate) = @_;
|
||||
|
||||
my $db = &$get_db();
|
||||
|
||||
registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
|
||||
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef);
|
||||
|
||||
}
|
||||
|
||||
sub findby_delta {
|
||||
|
||||
my ($delta,$load_recursive) = @_;
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
|
||||
return [] unless defined $delta;
|
||||
|
||||
my $rows = $db->db_get_all_arrayref(
|
||||
'SELECT * FROM ' .
|
||||
$table .
|
||||
' WHERE ' .
|
||||
$db->columnidentifier('delta') . ' = ?'
|
||||
,$delta);
|
||||
|
||||
return buildrecords_fromrows($rows,$load_recursive);
|
||||
|
||||
}
|
||||
|
||||
sub findby_service_number {
|
||||
|
||||
my ($service_number,$load_recursive) = @_;
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
|
||||
return undef unless (defined $service_number);
|
||||
|
||||
my $rows = $db->db_get_all_arrayref(
|
||||
'SELECT * FROM ' .
|
||||
$table .
|
||||
' WHERE ' .
|
||||
$db->columnidentifier('service_number') . ' = ?'
|
||||
,$service_number);
|
||||
|
||||
return buildrecords_fromrows($rows,$load_recursive)->[0];
|
||||
|
||||
}
|
||||
|
||||
sub findby_switch_number {
|
||||
|
||||
my ($switch_number,$load_recursive) = @_;
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
|
||||
return [] unless (defined $switch_number);
|
||||
|
||||
my $rows = $db->db_get_all_arrayref(
|
||||
'SELECT * FROM ' .
|
||||
$table .
|
||||
' WHERE ' .
|
||||
$db->columnidentifier('switch_number') . ' = ?'
|
||||
,$switch_number);
|
||||
|
||||
return buildrecords_fromrows($rows,$load_recursive);
|
||||
|
||||
}
|
||||
|
||||
sub update_delta {
|
||||
|
||||
my ($service_number,$delta) = @_;
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
|
||||
my $stmt = 'UPDATE ' . $table . ' SET delta = ?';
|
||||
my @params = ();
|
||||
push(@params,$delta);
|
||||
if (defined $service_number) {
|
||||
$stmt .= ' WHERE ' .
|
||||
$db->columnidentifier('service_number') . ' = ?';
|
||||
push(@params,$service_number);
|
||||
}
|
||||
|
||||
return $db->db_do($stmt,@params);
|
||||
|
||||
}
|
||||
|
||||
sub countby_service_number {
|
||||
|
||||
my ($service_number) = @_;
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
|
||||
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
|
||||
my @params = ();
|
||||
if (defined $service_number) {
|
||||
$stmt .= ' WHERE ' .
|
||||
$db->columnidentifier('service_number') . ' = ?';
|
||||
push(@params,$service_number);
|
||||
}
|
||||
|
||||
return $db->db_get_value($stmt,@params);
|
||||
|
||||
}
|
||||
|
||||
sub countby_delta {
|
||||
|
||||
my ($deltas) = @_;
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
|
||||
my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE 1=1';
|
||||
my @params = ();
|
||||
if (defined $deltas and 'HASH' eq ref $deltas) {
|
||||
foreach my $in (keys %$deltas) {
|
||||
my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in}));
|
||||
$stmt .= ' AND ' . $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')';
|
||||
push(@params,@values);
|
||||
}
|
||||
} elsif (defined $deltas and length($deltas) > 0) {
|
||||
$stmt .= ' AND ' . $db->columnidentifier('delta') . ' = ?';
|
||||
push(@params,$deltas);
|
||||
}
|
||||
|
||||
return $db->db_get_value($stmt,@params);
|
||||
|
||||
}
|
||||
|
||||
sub buildrecords_fromrows {
|
||||
|
||||
my ($rows,$load_recursive) = @_;
|
||||
|
||||
my @records = ();
|
||||
my $record;
|
||||
|
||||
if (defined $rows and ref $rows eq 'ARRAY') {
|
||||
foreach my $row (@$rows) {
|
||||
$record = __PACKAGE__->new($row);
|
||||
|
||||
# transformations go here ...
|
||||
|
||||
push @records,$record;
|
||||
}
|
||||
}
|
||||
|
||||
return \@records;
|
||||
|
||||
}
|
||||
|
||||
sub process_records {
|
||||
|
||||
my %params = @_;
|
||||
my ($process_code,
|
||||
$static_context,
|
||||
$init_process_context_code,
|
||||
$uninit_process_context_code,
|
||||
$multithreading,
|
||||
$numofthreads) = @params{qw/
|
||||
process_code
|
||||
static_context
|
||||
init_process_context_code
|
||||
uninit_process_context_code
|
||||
multithreading
|
||||
numofthreads
|
||||
/};
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
|
||||
my @cols = map { $db->columnidentifier($_); } qw/switch_number/;
|
||||
|
||||
return process_table(
|
||||
get_db => $get_db,
|
||||
class => __PACKAGE__,
|
||||
process_code => sub {
|
||||
my ($context,$rowblock,$row_offset) = @_;
|
||||
return &$process_code($context,$rowblock,$row_offset);
|
||||
},
|
||||
static_context => $static_context,
|
||||
init_process_context_code => $init_process_context_code,
|
||||
uninit_process_context_code => $uninit_process_context_code,
|
||||
destroy_reader_dbs_code => \&destroy_all_dbs,
|
||||
multithreading => $multithreading,
|
||||
tableprocessing_threads => $numofthreads,
|
||||
#'select' => 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols),
|
||||
'select' => $db->paginate_sort_query('SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols),undef,undef,[{
|
||||
column => 'rownum',
|
||||
numeric => 1,
|
||||
dir => 1,
|
||||
}]),
|
||||
'selectcount' => 'SELECT COUNT(*) FROM (SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols) . ') AS g',
|
||||
);
|
||||
}
|
||||
|
||||
sub getinsertstatement {
|
||||
|
||||
my ($insert_ignore) = @_;
|
||||
check_table();
|
||||
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
|
||||
|
||||
}
|
||||
|
||||
sub getupsertstatement {
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' .
|
||||
join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ')';
|
||||
my @values = ();
|
||||
foreach my $fieldname (@$expected_fieldnames) {
|
||||
if ('delta' eq $fieldname) {
|
||||
my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' .
|
||||
$db->columnidentifier('service_number') . ' = ?';
|
||||
push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')');
|
||||
} else {
|
||||
push(@values,'?');
|
||||
}
|
||||
}
|
||||
$upsert_stmt .= ' VALUES (' . join(',',@values) . ')';
|
||||
return $upsert_stmt;
|
||||
|
||||
}
|
||||
|
||||
sub gettablename {
|
||||
|
||||
return $tablename;
|
||||
|
||||
}
|
||||
|
||||
sub check_table {
|
||||
|
||||
return checktableinfo($get_db,
|
||||
__PACKAGE__,$tablename,
|
||||
$expected_fieldnames,
|
||||
$indexes);
|
||||
|
||||
}
|
||||
|
||||
1;
|
||||
@ -1,2 +1,2 @@
|
||||
default:
|
||||
default: 'NCOS 0'
|
||||
UPC:
|
||||
default: '0'
|
||||
|
||||
Loading…
Reference in new issue