Our python utility does all these functionality.
Major functionality:
- Process a batch of large cvs/text files
- Can iterate in sub-folders
- File can be filtered on criteria expressed in regular expression format
- Reading cvs/text files
- Performing sanity and basic column based validation checks like null values, duplicate values, max, min, unique values etc.
- It also infer data type for all columns
- SQL Server connection
- On the fly table creation and bulk insertion for any given text/cvs file
- Logging of all activities using a separate logger module
- Highly configured utility
Main.py:
import Utility import Logger import FieldTypes import SQLServerConnection import datetime import os import re import string import pyodbc #start time Logger.LogMessage(str(datetime.datetime.now())) dbConn = SQLServerConnection(); temp = Utility() #read source directory reader = open("srcdir.txt","rb") srcdir = reader.readline() iterate source directory temp.iterateFolder(srcdir) temp.reportProfile() print "profiling done" reader.close(); #end time Logger.LogMessage(str(datetime.datetime.now())) print datetime.datetime.now()
Utility.py
import SQLServerConnection import Field import csv import os import re import string from Logger import Logger import datetime class Utility: fileName = "" def __init__(self): #initialise class variable #map for fieldName -> fieldInformation self.data = dict() self.profileOutput = open('C:\\temp\\profile.csv','w') self.output = ['FileName','FieldName','InferredDataType','Len','TotalRows','NullValues','UniqueValues'] self.profileOutput.write(','.join(self.output)) self.profileOutput.write('\n') self.profileList = [] def readcvsFile(self, fileName, onlyName): ROWSEPARATER = "\n" FIELDSEPARATER = "\t" self.data = dict() #read as a text file reader = open(fileName,"rb") headers = reader.readline().split(FIELDSEPARATER) #header row Column = len(headers) #initialised the map for column in headers: self.data[column]=Field.Field(column.rstrip()) Logger.LogMessage("No of column: " + str(Column)) #process every row RowCount = 0 ErrorRow = 0 for rowstring in reader: RowCount = RowCount + 1 rowCol = rowstring.count(FIELDSEPARATER) + 1 row = rowstring.split(FIELDSEPARATER) #need to check value of last col #if it has "," it means there are more delimiter in text file if rowCol == Column: i =0 for column in row: (self.data[headers[i]]).addValue(column) i = i + 1 else: Logger.LogMessage("Row number " + str(RowCount) + " is not valid. It has " + str(rowCol) + " columns.") ErrorRow = ErrorRow + 1 Logger.LogMessage("No of Rows processed(Other than header file):"+str(RowCount)) Logger.LogMessage("Total Errors: "+str(ErrorRow)) reader.close(); name, extention = onlyName.split(".") createFile = "create table " + name + "(" fieldCount = 1 for k,v in self.data.iteritems(): #mark end of processing and take profiled output for each field v.endOfLoading() self.profileList.append([fileName, v.FieldName, v.FieldString, str(v.FieldSize), str(v.RecordSize), str(v.NullValues), str(v.UniqueValues)]) if fieldCount > 1: createFile = createFile + ", " createFile = createFile + v.FieldName + " varchar(1000) " fieldCount = fieldCount + 1 v.clear() createFile = createFile + ")" Logger.LogMessage("Sql for file creation:" + createFile) SQLServerConnection.cursor.execute(createFile) SQLServerConnection.conn.commit() bulkcopySQL = "BULK INSERT " + name + " FROM '" + fileName + "' WITH ( FIELDTERMINATOR='\t',FIRSTROW=2,ROWTERMINATOR='" + chr(10) + "')" Logger.LogMessage("running bluk copy:" + bulkcopySQL) SQLServerConnection.cursor.execute(bulkcopySQL) SQLServerConnection.conn.commit() del self.data def iterateFolder(self,dir): fileexp = re.compile(r'\w*\.csv') #iterate for the directory for f in os.listdir(dir): if os.path.isfile(os.path.join(dir,f)) and fileexp.match(f) is not None: Logger.LogMessage("***************************************") Logger.LogMessage("FileName: " + f) Logger.LogMessage("Directory Name: " + dir) self.readcvsFile(os.path.join(dir,f), f) Logger.LogMessage("*************************************") Logger.LogMessage(str(datetime.datetime.now())) Logger.LogMessage(" ") print "Processing " , f Logger.flush() elif os.path.isdir(os.path.join(os.getcwd(),f)): print os.path.join(dir,f) self.iterateFolder(os.path.join(dir,f)) def reportProfile(self): for detail in self.profileList: self.profileOutput.write(','.join(detail)) self.profileOutput.write('\n')
Field.py:
from types import * import re import datetime import Logger class FieldTypes: Null,Integer, Float, Varchar, Date, DateTime = range(6) intexp = re.compile('[0-9]+$') floatexp = re.compile('[0-9]*\.[0-9]+$') dateexp = re.compile(r'[0-9]{1,4}([.,/-])[0-9]{1,2}\1[0-9]{1,4}$') #this functions returns corresponding string for enum type def getEnumString(self, type): if type == self.Integer: return "Integer" if type is self.Float: return "numeric" if type is self.Varchar: return "varchar" if type is self.Date: return "date" if type is self.Null: return "null" #this function infers datatype using combination of new value and calculated datatype so far def fieldType(self,value, previousType): if len(value) is 0: return previousType if previousType is self.Float: if self.floatexp.match(value) is not None: return self.Float if previousType is self.Date: if self.dateexp.match(value) is not None: return self.Date if previousType is self.Null: if self.intexp.match(value) is not None: return self.Integer if self.floatexp.match(value) is not None: return self.Float if self.dateexp.match(value) is not None: return self.Date if previousType is self.Integer: if self.intexp.match(value) is not None: return self.Integer if self.floatexp.match(value) is not None: return self.Float if self.dateexp.match(value) is not None: return self.Date return self.Varchar def getFieldType(self,valueList): Type = self.Null for value in valueList: if Type is self.Varchar: break Type = self.fieldType(value, Type) del valueList return Type #Field class represent a complete field domain in the table. It has all properties #related to a field like name, datatype, size, min value etc. It has contains all #list of values for that particular field. class Field: def __init__(self, name, storeValue=0): self.FieldName = name self.FieldSize = 0 self.FieldType = FieldTypes.Null self.FieldValues = set() self.RecordSize = 0 self.UniqueValues = 0 self.FieldString = "" self.NullValues = 0 self.storeVal = storeValue def addValue(se1lf,val): self.RecordSize = self.RecordSize +1 #print self.FieldValues if len(val) is 0: self.NullValues = self.NullValues + 1 else: if self.storeVal is not 0: self.FieldValues.add((val.rstrip())) if len(val) > self.FieldSize: self.FieldSize = len(val) def clear(self): del self.FieldValues def endOfLoading(self): if self.storeVal is not 0: self.FieldType = FieldTypes().getFieldType(self.FieldValues) self.UniqueValues = len(self.FieldValues) self.FieldString = FieldTypes().getEnumString(self.FieldType) def printSummary(self): Logger.LogMessage(self.FieldName + " " + self.FieldString+ " " +str(self.FieldSize)+ str(self.RecordSize)+ str(self.NullValues)+ str(self.UniqueValues)) def printValues(self): for value in self.FieldValues: print value