OXIESEC PANEL
- Current Dir:
/
/
home
/
cubot
/
bin
/
etl
Server IP: 139.59.38.164
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
02/03/2023 08:06:37 AM
rwxr-xr-x
📄
config.pl
6.53 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
dataextract.sh
2.51 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
etl.pl
5.46 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
etlproc
11.81 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
func.pl
4.01 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
globvar.sh
330 bytes
02/03/2023 07:34:08 AM
rw-r--r--
📄
initetl.pl
1.88 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
libawk.awk
3.07 KB
02/03/2023 07:34:08 AM
rw-r--r--
📄
makeXtab2NormalExcel.pl
5.25 KB
02/03/2023 07:34:08 AM
rw-r--r--
📄
process.pl
6.92 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
process_changed.pl
5.69 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
readwrite.pl
4.16 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
rxcsv.pl
7.94 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
rxdb.pl
8.84 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
rxfile.pl
4.72 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
rxsalesforce.pl
3.33 KB
02/03/2023 07:34:08 AM
rw-r--r--
📄
rxsalesforce.pl.bak
3.33 KB
02/03/2023 07:34:08 AM
rw-r--r--
📄
rxxml.pl
12.66 KB
02/03/2023 07:34:08 AM
rw-r--r--
📄
rxxml2.pl
4.18 KB
02/03/2023 07:34:08 AM
rw-r--r--
📄
s.log
8.86 KB
02/03/2023 07:34:08 AM
rw-r--r--
📄
sendmail.pl
1.06 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
sourcequery.pl
4.17 KB
02/03/2023 07:34:08 AM
rw-r--r--
📄
sql.pl
13.89 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
sql_mssql.pl
13.89 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
store.pl
7.6 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
task.pl
30.55 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
task_mssql.pl
30.8 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
tmpxml.pl
1.49 KB
02/03/2023 07:34:08 AM
rw-r--r--
📄
userfunc.pl
5.5 KB
02/03/2023 07:34:08 AM
rw-r--r--
📄
valid.pl
3.9 KB
02/03/2023 07:34:08 AM
rw-r--r--
📄
workflow.pl
3.72 KB
02/03/2023 07:34:08 AM
rwxr-xr-x
📄
xform.pl
2.62 KB
02/03/2023 07:34:08 AM
rw-r--r--
Editing: task.pl
Close
#!/usr/bin/perl #use warnings; =head #------------------------------------------------------------------------- #####################Pseudo Code For Execution Of Task#################### #------------------------------------------------------------------------- 1. Get task details (Sourceid, Destid, Validity) 2. If task is not valid return ERROR 'Invalid task' 3. Create list of transformations 3. Connect to source ( In non db case try to access directory ) 4. If Connection Fails return ERROR 'Source Connection Failed'. 5. If source is database create source query & execute it. 6. while ( read data from source ) repeat 7,8 7. Apply Transform 8. Output to file #------------------------------------------------------------------------- =cut #SQL_wfmaster sub executetask{ my $taskid = shift; my $tablemapping = SQL_tablemapping($taskid); my ($mapname, $repid, $storeid, $srctableid ,$desttableid, $isvalid, $xfdisable, $validdisable, $xftype) = @$tablemapping; printlog($INFO, "Xfdisable:$xfdisable:Validdisable:$validdisable:"); if(($isvalid eq 'N') or ($isvalid eq 'I')){ $G_Err = "Invalid task status"; return 'ERR'; } export_etlconf(); $global_var1="$ENV{global_var1}"; $CUBETL_DATA_FROM_DATE="$ENV{CUBETL_DATA_FROM_DATE}"; $CUBETL_DATA_TO_DATE="$ENV{CUBETL_DATA_TO_DATE}"; statuslog "Execution Started for task id : $taskid [ $mapname ] "; # START: Resolving Source & Applying Validation + Xform. and Create Out File for loading mkdir("$RXXLS_DIR/$taskid") or printlog($INFO, "Using existing Folder for staging $RXXLS_DIR/$taskid"); $G_Stg_Outfile = "$RXXLS_DIR/$taskid/out.txt"; $G_Stg_Errfile = "$RXXLS_DIR/$taskid/$G_DB.err"; if($xftype ne 'U'){ open($G_DESTOUT, ">$G_Stg_Outfile") or return $G_Err = "Unable to write staging file"; open($G_DATAERR, ">$G_Stg_Errfile") or return $G_Err = "Unable to write error file"; chmod 0777, "$G_Stg_Outfile"; chmod 0777, "$G_Stg_Errfile"; } do_preload_sql($desttableid, $taskid);#Pre SQL && Pre Snapshot my $source_store_details = SQL_storemaster($storeid); my ($storename, $storetype, $dirdsn, $createdate, $storevalid, $storedesc, $lstupdt_ts, $expand, $storecat, $dbdirid) = @$source_store_details; my $srcqrystr = ''; my $response = ''; my $col_xform_details; my @destcolname = (); if(($xfdisable eq 1)&& ($validdisable eq 1)&&($G_Purpose ne "xformpreview")){ printlog($INFO, "Transformation Disabled and Validation Disabled"); $col_xform_details=SQL_columnmaster($desttableid); my $ord=0; foreach my $sourcecolrow(@$col_xform_details){ $destcolname[$ord] = @$sourcecolrow[1]; $ord++; } } else { printlog($INFO, "This is real transformation without preview"); $col_xform_details = SQL_xform($taskid); @xformrule = get_xform_rules($col_xform_details, \@destcolname);#destcolname for load } foreach $c(@destcolname){ printlog($MAX, "Destination Column Name=$c"); } if(($storetype eq 'D') || ($storetype eq 'S')){ #DataBase printlog($MAX, "Processing Database Task ..."); $response = xtract_database_source($taskid, \@$source_store_details, \@$col_xform_details, \@xformrule, $tablemapping); } else{ #$storetype eq 'F'/'E'/'X' printlog($MAX, "Processing Non Database Task ..."); $response = xtract_directory_source($taskid, \@$source_store_details, \@$col_xform_details, \@xformrule, $tablemapping); } if($xftype ne 'U'){ close $G_DESTOUT; close $G_DATAERR; } if($response eq 'ERR'){ return 'ERR'; } # DONE: Resolving Source & Applying Validation + Xform # START: Load Out File to destination table if($G_Purpose eq "xformpreview"){ return 'DONE'; } statuslog "--- STARTING LOADING PROCESS ".localtime(); load_destination($desttableid, \@destcolname, $taskid);#Load ; Post SQL && Post Snapshot statuslog "--- DONE LOADING PART ".localtime(); if($G_Err eq ''){ $G_Err = "DONE"; } else{ statuslog "$G_Err"; } return "$G_Err"; } sub run_database_task{ my ($taskid, $dbstoremaster, $srcqrystr, $xformrule, $srccols, $col_xform_details, $tablemapping) = @_; $srcqrystr = $srcqrystr.";"; $srcqrystr =~ s/;;$/;/; $srcqrystr =~ s/CUBETL_DATA_TO_DATE/'$CUBETL_DATA_TO_DATE'/; $srcqrystr =~ s/CUBETL_DATA_FROM_DATE/'$CUBETL_DATA_FROM_DATE'/; my ($xfdisable, $validdisable ) = (@$tablemapping[6], @$tablemapping[7]); my $src_skip_header; my ($dsn, $db, $dbtype, $user, $pass, $host, $port, $dbschema) = @$dbstoremaster; if($port == 0){ $port = '1521'; } $OUTFILE = "$RXXLS_DIR/$taskid/outdbdata.txt"; print "\ndbtype # ".$dbtype; statuslog "STARTING EXTRACTION PART for $dbtype ".localtime(); if($dbtype =~ m/racle/si){ printlog($INFO, "Processing : Language Oracle"); #$ENV{'NLS_LAN'}=".WE8MSWIN1252"; open(ORAFILE, ">$RXXLS_DIR/$taskid/ora.tmprx.1"); chmod 0777, "$RXXLS_DIR/$taskid/ora.tmprx"; print ORAFILE "export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/$CUBOT_HOME/sqlplus/;\n"; print ORAFILE "$CUBOT_HOME/sqlplus/sqlplus -s $user/\$1\@\'(DESCRIPTION =(ADDRESS_LIST =(ADDRESS = (PROTOCOL = TCP)(HOST = $host)(PORT = $port )))(CONNECT_DATA =(SERVICE_NAME = $db)))' << ! > $OUTFILE\n"; # print ORAFILE "sqlplus -s $user/$pass $db@$host<< ! > $OUTFILE\n\n"; print ORAFILE "set sqlprompt \"\"\n"; print ORAFILE "set termout off serverout off feedback off heading off sqlblanklines off\n"; print ORAFILE "set newpage none\n"; print ORAFILE "set arraysize 5000\n"; print ORAFILE "set linesize 9999\n"; #ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS''; print ORAFILE "ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS';\n"; print ORAFILE "ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD';\n"; my $qrysep = "||'$G_DBOutDelim'||"; #$srcqrystr =~ s/SLASH_N_CHAR/\n/g; print ORAFILE " $srcqrystr\n"; print ORAFILE "!"; system("sed 's/SLASH_N_CHAR/\\n/g;s/,/,\\n/g;s/ and /\\n and /gi;' $RXXLS_DIR/$taskid/ora.tmprx.1 > $RXXLS_DIR/$taskid/ora.tmprx"); my $cmd = "export NLS_LANG=.WE8MSWIN1252;sh $RXXLS_DIR//$taskid/ora.tmprx"; #system($cmd); `$CUBOT_HOME/bin/etl/etlproc "$cmd" "$pass"`; my $oramsg = check_oracle_error($OUTFILE); if($oramsg eq 'ERR'){ return 'ERR'; } } else{ open(ORAFILE, ">$RXXLS_DIR/$taskid/sql.tmprx"); chmod 0777, "$RXXLS_DIR/$taskid/sql.tmprx"; if($dbtype =~ m/MsSQL/si){ print ORAFILE "export TDSVER=8.0\n"; print ORAFILE "isql $dsn $user \$1 -b -d'|'<< ! 1&>$OUTFILE\n"; print ORAFILE " $srcqrystr\n"; print ORAFILE "!"; $cmd = "sh $RXXLS_DIR/$taskid/sql.tmprx"; `$CUBOT_HOME/bin/etl/etlproc "$cmd" "$pass"`; printlog($INFO, "running command is $CUBOT_HOME/bin/etl/etlproc $cmd $pass"); my $oramsg = check_sql_error($OUTFILE); if($oramsg eq 'ERR'){ return 'ERR'; } } # Bug No: 1167 elsif($dbtype =~ m/Postgre/si){ my $dypass=`$CUBOT_HOME/bin/etl/etlproc "echo \$1" "$pass"`; chop($dypass); print ORAFILE "export PGPASSWORD=$dypass\n"; print ORAFILE "psql -A -t -h $host -d $db -U $user -c \"$srcqrystr\" -o $OUTFILE\n"; $cmd = "sh $RXXLS_DIR/$taskid/sql.tmprx"; printlog($INFO, "$cmd"); `$CUBOT_HOME/bin/etl/etlproc "$cmd" "$pass"`; printlog($INFO, "running command is $CUBOT_HOME/bin/etl/etlproc $cmd $pass"); my $oramsg = check_sql_error($OUTFILE); if($oramsg eq 'ERR'){ return 'ERR'; } } elsif($dbtype =~ m/alesforce/si){ chop($srcqrystr); chmod 0777, $OUTFILE; open(FHOUTFILE, ">$OUTFILE"); my $dypass=`$CUBOT_HOME/bin/etl/etlproc "echo \$1" "$pass"`; chop($dypass); print FHOUTFILE getQueryData($user, $dypass, $dbschema, $srcqrystr); } else{ if((defined $host) && ($host ne "") && ($host !~ /localhost/i)){ $cmd = "mysql -h $host -u $user --password=\$1 $db -N -e \"$srcqrystr\" > $OUTFILE.1"; } else{ $cmd = "mysql $db -N -e \"$srcqrystr\" > $OUTFILE.1"; } print ORAFILE "$cmd"; $cmd = "sh $RXXLS_DIR/$taskid/sql.tmprx"; `$CUBOT_HOME/bin/etl/etlproc "$cmd" "$pass"`; system("sed -e \"s/\t/$G_DBOutDelim/g\" $OUTFILE.1 > $OUTFILE"); } } statuslog "--- DONE EXTRACTION PART for $dbtype ---"; $src_skip_header[0] = 0; $src_skip_header[1] = 0; printlog($INFO, "process_masked_file Started : ", time); my $dateformatcols = get_dateform_col($taskid); my $length=scalar @$dateformatcols; if(($xfdisable != 1)||( $validdisable != 1)){ process_masked_file($OUTFILE, "$RXXLS_DIR/$taskid/", "F", "", "$G_DBOutDelim", \@src_skip_header, $taskid, $xformrule, $srccols, $col_xform_details, 0, "N", $tablemapping, $txtqf); printlog($INFO, "process_masked_file Ended : ", time); } if($length >0){ $G_datecol='Y'; process_masked_file($OUTFILE, "$RXXLS_DIR/$taskid/", "F", "", "$G_DBOutDelim", \@src_skip_header, $taskid, $xformrule, $srccols, $col_xform_details, 0, "N", $tablemapping, $txtqf); } elsif($length <1 && $xfdisable == 1 && $validdisable == 1){ #If no default Date format-AND-Transformation disabled-AND-Validation disabled BUG NO:1049 $G_ora='Y'; # Load outdbdata file without any transformation } if($G_Purpose eq "xformpreview"){ process_masked_file($OUTFILE, "$RXXLS_DIR/$taskid/", "F", "", "$G_DBOutDelim", \@src_skip_header, $taskid, $xformrule, $srccols, $col_xform_details, 0, "N", $tablemapping, $txtqf); printlog($INFO, "process_masked_file Ended : ", time); } close(ORAFILE); } #BUG:859 -> Will provide the actual table name from the mask format sub getMaskTableName{ $maskFormat = shift; $indx= index($maskFormat, '%'); $format= substr($maskFormat, $indx); $date = `date +$format` ; $rest = length($maskFormat) - $indx; substr($maskFormat, $indx, $rest, $date); return trim($maskFormat); } sub xtract_database_source{ # START: Creating Source Query my ($mapid, $source_store_details, $col_xform_details, $xformrule, $tablemapping) = @_; my ($storename, $storetype, $dirdsn, $createdate, $storevalid, $storedesc, $lstupdt_ts, $expand, $storecat, $dbdirid) = @$source_store_details; my ($storeid, $srctableid) = (@$tablemapping[2], @$tablemapping[3]); my ($xfdisable, $validdisable ) = (@$tablemapping[6], @$tablemapping[7]); my $ord = 0; my @srccols = (); my $dbstoremaster = SQL_dbstore($storeid); my ($dsn, $db, $dbtype, $user, $pass, $host, $dbschema) = @$dbstoremaster; #BUG:859 -> Accept mask format from database, $etm_storeid is not used, Taken to accept $maskFormat which is next to it my $srctabdtl = SQL_tablemaster($srctableid); my ($dbmaskid, $srctabname, $tabtype, $srcdest, $etm_storeid, $maskFormat) = @$srctabdtl; printlog($INFO, "Source $storename is Database. DSN=$dirdsn and TABType:$tabtype-MSKID:$dbmaskid-TID:$srctableid"); my $srcqrystr = ''; if($tabtype eq 'Q'){ my $sqry = SQL_querydetails($srctableid); printlog($INFO, "DB Gen S-QRY: $sqry"); $sqry =~ s/\n/ /gsi; my ($collist, $aftr_from) = getqcols($sqry); printlog($MAX, "[$collist]"); if($dbtype =~ m/racle/si){#Looking Oracle $qrysep = "||'$G_DBOutDelim'||SLASH_N_CHAR"; if($collist eq ' *'){ my $i=0; $collist=""; my $source_colmaster = SQL_columnmaster_NoSource($srctableid); foreach my $sourcecolrow(@$source_colmaster){ if($i ==0){ $collist=@$sourcecolrow[1]; } else{ $collist=$collist.",".@$sourcecolrow[1]; } $i++; } printlog($MAX, "All columns from table"); } $collist=$collist.","."\'endl\'"; printlog($MAX, "my collist is $collist"); $srcqrystr = makeOraPipedQuery($sqry, $collist, $qrysep); } elsif($dbtype =~ m/alesforce/si){ $srcqrystr = 'SELECT ' . $collist . ' FROM ' . $aftr_from; } else{ #$srcqrystr = 'select ' . $collist.',\'endl\''.' from' . $aftr_from; $srcqrystr = 'SELECT ' . $collist.',\'endl\''.' FROM' . $aftr_from; } printlog($MAX, "Final DB Gen S-QRY: $srcqrystr"); @srccols = (); my $source_colmaster = SQL_columnmaster($srctableid); foreach my $sourcecolrow(@$source_colmaster){ $srccols[$ord] = @$sourcecolrow[1]; $ord++; } } else{ my $sqry = SQL_querydetails($srctableid); if(($xfdisable eq 1)&& ($validdisable eq 1)){ my $source_colmaster = SQL_columnmaster($srctableid); foreach my $sourcecolrow(@$source_colmaster){ $srccols[$ord] = @$sourcecolrow[1]; $ord++; } } else{ foreach my $xform(@$col_xform_details){# Because in case of DB we retrv data in proeper xformed order.. my ($srccolid, $srccolname, $destcolid , $destcolname, $include, $desttblid, $srctblid, $xfrule) = @$xform; next if($include ne 'Y');#Ignoring cols if not included in mapping $srccols[$ord] = $srccolname; #print "@@@@@@@@@@@@@@@@@@@ Next Source Column [$ord]= $srccols[$ord]\n"; $ord++; } } #BUG:859 -> Get Actual file name from mask format if defined if(length($maskFormat) > 0){ $srctabname=getMaskTableName($maskFormat); } $srcqrystr = create_database_source_query($dbtype, $srctabname, \@srccols, $tabtype);#in Q case this will be columns #print ("SUNEEL srcquery \n\n".$srcqrystr); } printlog($MAX, "Database[$db:$dbtype:$host] Query=[$srcqrystr]"); my $response = ''; printlog($MAX, "Running $dbtype Task"); $G_numcols=$ord; $response = run_database_task($mapid, $dbstoremaster, $srcqrystr, \@$xformrule, \@srccols, \@$col_xform_details, $tablemapping); if($response eq 'ERR'){ return 'ERR'; } return 'CUB_OK'; } #File & Excel sub xtract_directory_source{ my ($taskid, $source_store_details, $col_xform_details, $xformrule, $tablemapping) = @_; my ($storename, $storetype, $dirdsn, $createdate, $storevalid, $storedesc, $lstupdt_ts, $expand, $storecat, $dbdirid) = @$source_store_details; my $dbstoremaster = SQL_fstore_map($taskid); my ($storeid, $srctableid, $tablename, $valid, $maskfmt, $ftype, $fdelim, $txtqf) = @$dbstoremaster; if(($txtqf ne '') && ($txtqf ne '0')){ $G_enclosed = $txtqf; } else{ $G_enclosed = ''; } $G_numcols=0; my $src_skip_header = SQL_srctable_details($srctableid); printlog($MAX, "Source $storename is XLS. DIR=$dirdsn [ $maskfmt ]"); my @srccols = (); my $source_colmaster = SQL_columnmaster($srctableid); my $ord = 0; foreach my $sourcecolrow(@$source_colmaster){ $srccols[$ord] = @$sourcecolrow[1]; $ord++; } my $srctabdtl = SQL_tablemaster($srctableid); my ($dbmaskid, $srctabname, $tabtype, $srcdest) = @$srctabdtl; my @maskedfiles = (); if($storetype eq 'X'){ printlog($MAX, "Source $storename is XLS. DIR=$dirdsn [ $maskfmt ]"); $indirmask = $maskfmt; if(write_xmlformat2csv($dirdsn, $indirmask) != -9999){ write_xmldata2csv($dirdsn, $indirmask); } process_masked_file($maskfile, $dirdsn, $storetype, $srctabname, $fdelim, $src_skip_header, $taskid, \@$xformrule, \@srccols, $col_xform_details, $dbmaskid, 'N', $tablemapping, $txtqf); } else{#non XML my $maskdtl = SQL_fstore($dbmaskid);#// maskdtl[7] will have ident=Y if identical my $allsh = ''; $allsh = "*" if(@$maskdtl[7] eq 'Y'); @maskedfiles = get_masked_files($dirdsn, $maskfmt); $G_NumRecIgnr = 0; foreach my $maskfile(@maskedfiles){ if($storetype eq 'E'){ wait_for_xls($maskfile, $maskfmt, $allsh); } if($storetype eq 'M'){ use File::Basename; my $filename = basename ($maskfile); process_mdb($dirdsn, $filename); } process_masked_file($maskfile, $dirdsn, $storetype, $srctabname, $fdelim, $src_skip_header, $taskid, \@$xformrule, \@srccols, $col_xform_details, $dbmaskid, @$maskdtl[7], $tablemapping, $txtqf); } } return 'CUB_OK'; } ########################################################################## # ALL KIND OF TASK FINALLY GIVE FILE TO PROCESS # # OBJECTIVE HERE IS GET FILE OUT FROM DB, XML, XLS THEN PROCESS IT. # # XFORM & VALIDATION WILL GET APPLIED ON FILE # ########################################################################## sub process_masked_file{ my ($maskfile, $dirdsn, $storetype, $srctabname, $fdelim, $src_skip_header, $mapid, $xformrule, $srccols, $col_xform_details, $dbmaskid, $ident, $tablemapping, $txtqf) = @_; my ($infilename, $srcdelim, $srcftype, $mysrcstmt, $prnt) = (); my ($xfdisable, $validdisable ) = (@$tablemapping[6], @$tablemapping[7]); @G_inputDataArray = (); my $validation ; @datecolname =(); my $validrules = get_validation_rules($mapid); my $dateformatcols = get_dateform_col($mapid); my $dateformat; #if($G_fname eq ""){ # Issue 1024 $G_fname= getfname($maskfile);# ARUN #} printlog($MAX, "File Being Proccessed = $G_fname" ); my $ord=0; foreach my $datecolrow(@$dateformatcols){ @datecolname[$ord] = @$datecolrow[0]; @datecolname[$ord]= @datecolname[$ord].":".@$datecolrow[1]; printlog($MAX, "After @datecolname[$ord]"); $ord++; } #Now creating setting as source file configuration if($storetype eq 'E'){ printlog($INFO, "Parsing File : [ $maskfile ]"); my $stgfile = $srctabname; $stgfile = 'allsheets.csv' if($ident eq 'Y'); $infilename = $RXXLS_DIR."/".getdirname("$maskfile/")."/".$stgfile; $srcdelim = $G_PerlOutDelim; $srcftype = 'D'; } elsif($storetype eq 'F'){ #TODO: Need to handle for fixed width also printlog($INFO, "Parsing File : [ $maskfile ]"); $infilename = $maskfile; $srcdelim = $fdelim; $srcftype = 'D'; } elsif($storetype eq 'X'){ $infilename = $RXXLS_DIR."/XML".getdirname("$dirdsn/").".$maskstr"."/".$srctabname; $srcdelim = $G_PerlOutDelim; $srcftype = 'D'; } elsif($storetype eq 'M'){ my $stgfile = $srctabname; $infilename = $RXXLS_DIR."/".getdirname("$maskfile/")."/".$stgfile; $srcdelim = "|"; $srcftype = 'D'; } #Now Parsing Source file if($srcdelim eq 'S'){ $srcdelim = ' '; } elsif($srcdelim eq 'T'){ $srcdelim = "\t"; } printlog($MAX, "StoreType=[$storetype]\tFileDelim=[$srcdelim]\tSKIP[1]=",@$src_skip_header[1],"\tDIR=$dirdsn\tINF=$infilename"); open($mysrcstmt, $infilename); for($i=0; $i<@$src_skip_header[1]+@$src_skip_header[0]; $i++){#Skip rows + Header rows @G_inputDataArray = doselectline($mysrcstmt, $srcftype, $srcdelim, "", $txtqf, $storetype, \@datecolname,$tablemapping); $G_NumRecIgnr++; #if($indata[0] eq 'ERR'){ # last; #} } # Below Code Is Written (tried) More From Speed Perspective. Code Reusabilty & Optimization will not be found. # Here time is imp my $errln = ''; my $ln = ''; $xfdisable = 0 if not defined $xfdisable; $validdisable = 0 if not defined $validdisable; statuslog "STARTING TRANSFORM PART with xfdisable=$xfdisable and validdisable=$validdisable ".localtime(); printlog($MAX, "XF_DIS=$xfdisable and VALID_DIS=$validdisable"); $prnt = 0; $xftype = @$tablemapping[8]; if($xftype eq 'U'){ # User Defined Method For XF #1. User Can write XF & Validation In His Script #2. Output Generated By USER Defined script should be in below format #2a.ETL will pass arguments to userscript as last arg like below # taskid=$taskid##delim=$G_PerlOutDelim##outfile=$G_Staging_Dir/$taskid/out.txt##errfile=$G_Staging_Dir/$taskid/out.err## #2b.Output File should be outfile as from 2a arg #2c.Output Fields Terminated By delim as from 2a arg my $cmd = "taskid=$mapid##delim=$G_PerlOutDelim##infile=$maskfile##outfile=$G_Stg_Outfile##errfile=$G_Stg_Errfile##"; $cmd = "@$xformrule[0] $cmd"; statuslog "Firing USER DEFINED COMMAND SCRIPT [$cmd]. Now waiting.."; system($cmd); printlog($MAX, "-------------->>>>>>>>>>>>>>> Done: [$!] $? <<<<<<<<<<<<<---------------------"); } else{ if($G_Purpose eq "xformpreview"){#for preview process 10 recds #IF LOOKING PREVIEW PART for($prnt=0; $prnt<10; $prnt++) { @G_inputDataArray = doselectline($mysrcstmt, $srcftype, $srcdelim, "", $txtqf, $storetype, \@datecolname, $tablemapping); if($G_inputDataArray[0] eq 'ERR'){ last; } #FIXME: Wrong for Preview: Bcz here we are not using xfdisable & validdisable if(validate_input(\@G_inputDataArray, \@$srccols, \%$validrules, $col_xform_details)){ resolve_xforms(\@G_inputDataArray, \@$xformrule, \@$srccols, $txtqf); } } } elsif(($xfdisable eq "1") && ($validdisable eq "1")){#IF XFORM & VALID NOT APPLICABLE while(@G_inputDataArray = doselectline($mysrcstmt, $srcftype, $srcdelim, "", $txtqf, $storetype, \@datecolname, $tablemapping)){ last if($G_inputDataArray[0] eq 'ERR'); if($G_inputDataArray[0] ne "blanklines"){ putdata_without_xform(\@G_inputDataArray, \@$srccols); } } } elsif(($xfdisable eq "1") && ($validdisable eq "0")){#IF XFORM:NO & VALID:YES while(@G_inputDataArray = doselectline($mysrcstmt, $srcftype, $srcdelim, "", $txtqf, $storetype, \@datecolname, $tablemapping)){ last if($G_inputDataArray[0] eq 'ERR'); if(validate_input(\@G_inputDataArray, \@$srccols, \%$validrules, $col_xform_details)){ putdata_without_xform(\@G_inputDataArray, \@$srccols); } } } elsif(($xfdisable eq "0") && ($validdisable eq "1")){#IF XFORM:YES & VALID:NO while(@G_inputDataArray = doselectline($mysrcstmt, $srcftype, $srcdelim, "", $txtqf, $storetype, \@datecolname, $tablemapping)){ last if($G_inputDataArray[0] eq 'ERR'); resolve_xforms(\@G_inputDataArray, \@$xformrule, \@$srccols, $txtqf); } } else{#IF BOTH THR while(@G_inputDataArray = doselectline($mysrcstmt, $srcftype, $srcdelim, "", $txtqf, $storetype, \@datecolname, $tablemapping)){ if($G_inputDataArray[0] eq 'ERR'){ last; } my $validate_op = validate_input(\@G_inputDataArray, \@$srccols, \%$validrules, $col_xform_details); if($validate_op == -1){ last; } if($validate_op){ resolve_xforms(\@G_inputDataArray, \@$xformrule, \@$srccols, $txtqf); } } } } statuslog "--- DONE TRANSFORM PART ".localtime(); close($mysrcstmt); } sub putdata_without_xform { my $indata = shift; my $colnames = shift; my $col = '', $outstr = ''; $outstr = ''; my $len= scalar @$colnames - 1; my @data = @$indata[0 .. $len]; $outstr = join($G_PerlOutDelim, @data); #printlog($MAX, "Ouput Data ==> !!!!!!(@$indata)======>($outstr)!!!!!!"); print $G_DESTOUT "$outstr\n"; } ############################################# # COLLECTING XF & VALID RULES IN THIS BLOCK # ############################################# sub get_xform_rules{ my $col_xform_details = shift; my $destcols = shift; my $ord = 0; my @xformrule = (); foreach $xform(@$col_xform_details){ my ($srccolid, $srccolname, $destcolid , $destcolname, $include, $desttblid, $srctblid, $xfrule) = @$xform; next if($include ne 'Y');#Ignoring cols if not included in mapping printlog($INFO, "$srccolid, $srccolname, $destcolid ,$destcolname, $include"); if(($xfrule =~ /Selec.*/) || ($xfrule eq '')){ $xformrule[$ord] = $srccolname; } else{ $xformrule[$ord] = $xfrule; } @$destcols[$ord] = $destcolname; printlog($MAX, "Adding Name To Dest List : $destcolname"); $ord++; } return @xformrule; } sub get_validation_rules{ my $mapid = shift; my %validrules; my $i = 0; my $validation ; $validation = SQL_col_validation($mapid); foreach my $vrule(@$validation){ $validrules{@$vrule[3]} = \@$vrule; $i++; printlog($MAX, "Validation Rule[$i] ,@$vrule[3]"); } return \%validrules } ############################################# # QUERY RELATED THING ( PRE, POST & OTHER ) ############################################# sub do_preload_sql{ my $desttableid = shift; my $mapid = shift; my $desttable_details = SQL_desttable_details($mapid); my ($did, $trunctable, $replacedata, $postsqlid, $presqlid, $snapshot, $snapfreq, $snapdatecolid) = @$desttable_details; my $snapdtl = (); if($snapdatecolid ne ''){ $snapdtl = SQL_snapcol_details($snapdatecolid); } my ( $snapdatecol )= @$snapdtl; my $destdb = $G_DB; statuslog "--- EXECUTING PRE LOAD SQL ---"; if($presqlid ne ''){ execute_sql_query($presqlid, $destdb); }else{ statuslog "NO PreSQL Found"; } statuslog "--- DONE PreSQL ---"; if( $snapshot eq 'Y' ){ my $desttabdtl = SQL_tablemaster($desttableid); my ($dbmaskid, $desttabname, $tabtype, $srcdest, $storeid ) = @$desttabdtl; statuslog "--- EXECUTING PRE SNAP"; my $preqry = "delete from $desttabname where yrmth($snapdatecol) = yrmth(DATE_SUB(now(), INTERVAL 1 DAY));"; printlog($MAX, "Executing Pre SNAP: $preqry"); if($ENV{CUBOT_DBPASS} ne ''){ open ($PIPE, "mysql -p$ENV{CUBOT_DBPASS} -u $ENV{CUBOT_DBUSER} $destdb -e \"$preqry\" 2>$G_Shell_Errfile |"); } else{ open ($PIPE, "mysql $destdb -e \"$preqry\" 2>$G_Shell_Errfile |"); } close($PIPE); statuslog "--- DONE PreSNAP ---"; } } sub execute_sql_query{ my $qryid = shift; my $destdb = shift; # Now Lets Run Post Load SQL my $postqry = SQL_querydetails($qryid); return 1 if($postqry eq ''); if($postqry =~ m/;/g){ #Do nothing } else{ $postqry = $postqry.";"; } $postqry=~s/"/\\"/g; $postqry=~s/global_var1/$global_var1/g; $postqry =~ s/CUBETL_DATA_TO_DATE/'$CUBETL_DATA_TO_DATE'/; $postqry =~ s/CUBETL_DATA_FROM_DATE/'$CUBETL_DATA_FROM_DATE'/; printlog($INFO, "Executing SQL => $postqry"); my $PIPE; if($ENV{CUBOT_DBPASS} ne ''){ open ($PIPE, "mysql -p$ENV{CUBOT_DBPASS} -u $ENV{CUBOT_DBUSER} $destdb -e \"$postqry\" 2>$G_Shell_Errfile |"); } else{ open ($PIPE, "mysql $destdb -e \"$postqry\" 2>$G_Shell_Errfile |"); } close($PIPE); my $filesize = -s $G_Shell_Errfile ; if($filesize ne "0"){ open($PIPE, $G_Shell_Errfile); $G_Err = <$PIPE>; return 'ERR'; } return 'DONE'; #TODO: Set $G_Err here } ###################################### # LOAD RELATED THINGS IN THIS BLOCK # ###################################### sub load_destination{ my $desttableid = shift; my $destcolname = shift; my $mapid = shift; my ($oldrc, $newrc) = (); # Record counters my $destcolstr = ''; $destcolstr = join (',' , @$destcolname); printlog($MAX, "Destination={$destcolstr}"); my $desttabdtl = SQL_tablemaster($desttableid); my ($dbmaskid, $desttabname, $tabtype, $srcdest, $storeid ) = @$desttabdtl; my $destdb = $G_DB; my $destdsn = "DBI:mysql:".$destdb.":localhost"; my $destconn = opendb($destdsn, $ENV{"CUBOT_DBUSER"}, $ENV{"CUBOT_DBPASS"}); statuslog "Table Used For Destination : $desttabname"; printlog($MAX, "Checking For Proper Destination Table: SELECT $destcolstr FROM $desttabname limit 1"); $destconn->do("SELECT $destcolstr FROM $desttabname limit 1") or return $G_Err= 'Improper destination table.'; #$destconn->do("SELECT 1 FROM $desttabname limit 1") or create_desttab($destconn, $desttabname, $desttableid); $desttable_details = SQL_desttable_details($mapid); my ($did, $trunctable, $replacedata, $postsqlid, $presqlid, $snapshot, $snapfreq, $snapdatecolid) = @$desttable_details; my $snapdtl = (); if($snapdatecolid ne ''){ $snapdtl = SQL_snapcol_details($snapdatecolid); } my ( $snapdatecol )= @$snapdtl; $oldrc = 0; $newrc = 0; statuslog "Do I need to to truncate table [ Y/N ] : $trunctable"; if($trunctable eq "Y"){ statuslog "------------------- TRUNCATING DESTINATION TABLE : $desttabname ----------------------"; truncate_desttable($destdb, $desttabname); } else{ my $res = doselectone("select count(*) from $desttabname"); statuslog "Fetching Old Record Count in $desttabname = @$res"; $oldrc = @$res[0]; } my $fcount = 0; if($G_ora eq 'Y'){ use File::Basename; my $oradir=dirname($G_Stg_Outfile); my $orafile=$oradir."/outdbdata.txt"; $fcount = `grep 'endl'<$orafile|wc -l`; } else{ $fcount = `wc -l < $G_Stg_Outfile`; } printlog($INFO, "Error: Line Counting failed: $?") if $?; chomp($fcount); load_data_infile($destdb, $desttabname, $replacedata, $destcolstr); if( $snapshot eq 'Y' ){ my $desttabdtl = SQL_tablemaster($desttableid); my ($dbmaskid, $desttabname, $tabtype, $srcdest, $storeid ) = @$desttabdtl; printlog($MAX, "---- EXECUTING POST SNAP ----"); my $postqry = "update $desttabname set $snapdatecol=DATE_SUB(now(), INTERVAL 1 DAY) where $snapdatecol is null or $snapdatecol ='0000-00-00'; "; printlog($INFO, "---- Executing Post SNAP: $postqry"); if($ENV{CUBOT_DBPASS} ne ''){ open ($PIPE, "mysql -p$ENV{CUBOT_DBPASS} -u $ENV{CUBOT_DBUSER} $destdb -e \"$postqry\" 2>$G_Shell_Errfile |"); } else{ open ($PIPE, "mysql $destdb -e \"$postqry\" 2>$G_Shell_Errfile |"); } close($PIPE); printlog($INFO, "---- PostSNAP: Completed"); } printlog($INFO, "---- EXECUTING POST LOAD SQL ----"); execute_sql_query($postsqlid, $destdb); printlog("PostSQL: $postsqlid Done"); printlog($INFO, "---- COMPLETED POST LOAD SQL ----"); #execute_sql_query($desttableid, $destdb); printlog($INFO, "----- Execution Done for task id : $taskid -----"); my $res = doselectone("select count(*) from $desttabname"); $newrc = @$res[0]; $G_NumRecTotal = $newrc; $G_NumRecProc = $fcount; $G_NumRecDisc = $fcount - ($newrc - $oldrc); # Rec in File - Newly Inserted Rec in DB } sub truncate_desttable{ my ($destdb, $desttabname) = @_; printlog($INFO, "mysql --local-infile=1 -p$ENV{\"CUBOT_DBPASS\"} $destdb"); if($ENV{CUBOT_DBPASS} ne ""){ `mysql --local-infile=1 -p$ENV{CUBOT_DBPASS} -u $ENV{CUBOT_DBUSER} $destdb <<! truncate table $desttabname; !` } else{ `mysql --local-infile=1 $destdb <<! truncate table $desttabname; ! ` } } sub load_data_infile{ my ($destdb, $desttabname, $replace, $destcols ) = @_; my $isreplace = 'IGNORE'; if($replace eq 'Y'){ $isreplace = 'REPLACE'; } printlog($INFO, "Determine Is Oracle = $G_ora"); use File::Basename; $oradir=dirname($G_Stg_Outfile); $orafile=$oradir."/outdbdata.txt"; my $qf = ""; if(($G_enclosed ne '')&&($G_enclosed ne '0')){ my $tmpsupfile="/tmp/out_new1.txt"; open(SUP, "$G_Stg_Outfile") or die "$!"; open(TMP, "> $tmpsupfile") or die "$!"; my $line; while (<SUP>) { $line=$_; chomp $line; $line =~ s/$G_enclosed//g; #printlog($MAX, "This line will come as a output [$line]"); print TMP "$line\n"; } close(TMP) or die "$!"; close(SUP) or die "$!"; `mv $tmpsupfile $G_Stg_Outfile`; } statuslog "Loading data using $isreplace option"; my ($dbtype, $database, $host, $user, $pass, $home ) = get_etlparams(); my $destdbconn = opendb("DBI:$dbtype:$G_DB:$host;mysql_emulated_prepare=1;", "$user", "$pass"); if($G_ora eq 'Y'){ $query = "load data infile '$orafile' $isreplace into table $desttabname fields terminated by '$G_DBOutDelim' lines terminated by 'endl\n' "; } else{ $query = "load data infile '$G_Stg_Outfile' $isreplace into table $desttabname fields terminated by '$G_PerlOutDelim' $qf ($destcols)"; } printlog($MAX, "LOADING DATA - $query"); $G_enclosed = ''; my $result = $destdbconn->do($query); loading_msg_print($destdbconn); $G_ora='N'; printlog($INFO, "OUT= $?, => $!") if $?; closedb($destdbconn); # `rm -rf $oradir`; } 1;