Our python utility does all these functionality.
Major functionality:
- Process a batch of large cvs/text files
- Can iterate in sub-folders
- File can be filtered on criteria expressed in regular expression format
- Reading cvs/text files
- Performing sanity and basic column based validation checks like null values, duplicate values, max, min, unique values etc.
- It also infer data type for all columns
- SQL Server connection
- On the fly table creation and bulk insertion for any given text/cvs file
- Logging of all activities using a separate logger module
- Highly configured utility
Main.py:
import Utility
import Logger
import FieldTypes
import SQLServerConnection
import datetime
import os
import re
import string
import pyodbc
#start time
Logger.LogMessage(str(datetime.datetime.now()))
dbConn = SQLServerConnection();
temp = Utility()
#read source directory
reader = open("srcdir.txt","rb")
srcdir = reader.readline()
iterate source directory
temp.iterateFolder(srcdir)
temp.reportProfile()
print "profiling done"
reader.close();
#end time
Logger.LogMessage(str(datetime.datetime.now()))
print datetime.datetime.now()
Utility.py
import SQLServerConnection
import Field
import csv
import os
import re
import string
from Logger import Logger
import datetime
class Utility:
fileName = ""
def __init__(self):
#initialise class variable
#map for fieldName -> fieldInformation
self.data = dict()
self.profileOutput = open('C:\\temp\\profile.csv','w')
self.output = ['FileName','FieldName','InferredDataType','Len','TotalRows','NullValues','UniqueValues']
self.profileOutput.write(','.join(self.output))
self.profileOutput.write('\n')
self.profileList = []
def readcvsFile(self, fileName, onlyName):
ROWSEPARATER = "\n"
FIELDSEPARATER = "\t"
self.data = dict()
#read as a text file
reader = open(fileName,"rb")
headers = reader.readline().split(FIELDSEPARATER)
#header row
Column = len(headers)
#initialised the map
for column in headers:
self.data[column]=Field.Field(column.rstrip())
Logger.LogMessage("No of column: " + str(Column))
#process every row
RowCount = 0
ErrorRow = 0
for rowstring in reader:
RowCount = RowCount + 1
rowCol = rowstring.count(FIELDSEPARATER) + 1
row = rowstring.split(FIELDSEPARATER)
#need to check value of last col
#if it has "," it means there are more delimiter in text file
if rowCol == Column:
i =0
for column in row:
(self.data[headers[i]]).addValue(column)
i = i + 1
else:
Logger.LogMessage("Row number " + str(RowCount) + " is not valid. It has " + str(rowCol) + " columns.")
ErrorRow = ErrorRow + 1
Logger.LogMessage("No of Rows processed(Other than header file):"+str(RowCount))
Logger.LogMessage("Total Errors: "+str(ErrorRow))
reader.close();
name, extention = onlyName.split(".")
createFile = "create table " + name + "("
fieldCount = 1
for k,v in self.data.iteritems():
#mark end of processing and take profiled output for each field
v.endOfLoading()
self.profileList.append([fileName, v.FieldName, v.FieldString, str(v.FieldSize), str(v.RecordSize), str(v.NullValues), str(v.UniqueValues)])
if fieldCount > 1:
createFile = createFile + ", "
createFile = createFile + v.FieldName + " varchar(1000) "
fieldCount = fieldCount + 1
v.clear()
createFile = createFile + ")"
Logger.LogMessage("Sql for file creation:" + createFile)
SQLServerConnection.cursor.execute(createFile)
SQLServerConnection.conn.commit()
bulkcopySQL = "BULK INSERT " + name + " FROM '" + fileName + "' WITH ( FIELDTERMINATOR='\t',FIRSTROW=2,ROWTERMINATOR='" + chr(10) + "')"
Logger.LogMessage("running bluk copy:" + bulkcopySQL)
SQLServerConnection.cursor.execute(bulkcopySQL)
SQLServerConnection.conn.commit()
del self.data
def iterateFolder(self,dir):
fileexp = re.compile(r'\w*\.csv')
#iterate for the directory
for f in os.listdir(dir):
if os.path.isfile(os.path.join(dir,f)) and fileexp.match(f) is not None:
Logger.LogMessage("***************************************")
Logger.LogMessage("FileName: " + f)
Logger.LogMessage("Directory Name: " + dir)
self.readcvsFile(os.path.join(dir,f), f)
Logger.LogMessage("*************************************")
Logger.LogMessage(str(datetime.datetime.now()))
Logger.LogMessage(" ")
print "Processing " , f
Logger.flush()
elif os.path.isdir(os.path.join(os.getcwd(),f)):
print os.path.join(dir,f)
self.iterateFolder(os.path.join(dir,f))
def reportProfile(self):
for detail in self.profileList:
self.profileOutput.write(','.join(detail))
self.profileOutput.write('\n')
Field.py:
from types import *
import re
import datetime
import Logger
class FieldTypes:
Null,Integer, Float, Varchar, Date, DateTime = range(6)
intexp = re.compile('[0-9]+$')
floatexp = re.compile('[0-9]*\.[0-9]+$')
dateexp = re.compile(r'[0-9]{1,4}([.,/-])[0-9]{1,2}\1[0-9]{1,4}$')
#this functions returns corresponding string for enum type
def getEnumString(self, type):
if type == self.Integer:
return "Integer"
if type is self.Float:
return "numeric"
if type is self.Varchar:
return "varchar"
if type is self.Date:
return "date"
if type is self.Null:
return "null"
#this function infers datatype using combination of new value and calculated datatype so far
def fieldType(self,value, previousType):
if len(value) is 0:
return previousType
if previousType is self.Float:
if self.floatexp.match(value) is not None:
return self.Float
if previousType is self.Date:
if self.dateexp.match(value) is not None:
return self.Date
if previousType is self.Null:
if self.intexp.match(value) is not None:
return self.Integer
if self.floatexp.match(value) is not None:
return self.Float
if self.dateexp.match(value) is not None:
return self.Date
if previousType is self.Integer:
if self.intexp.match(value) is not None:
return self.Integer
if self.floatexp.match(value) is not None:
return self.Float
if self.dateexp.match(value) is not None:
return self.Date
return self.Varchar
def getFieldType(self,valueList):
Type = self.Null
for value in valueList:
if Type is self.Varchar:
break
Type = self.fieldType(value, Type)
del valueList
return Type
#Field class represent a complete field domain in the table. It has all properties
#related to a field like name, datatype, size, min value etc. It has contains all
#list of values for that particular field.
class Field:
def __init__(self, name, storeValue=0):
self.FieldName = name
self.FieldSize = 0
self.FieldType = FieldTypes.Null
self.FieldValues = set()
self.RecordSize = 0
self.UniqueValues = 0
self.FieldString = ""
self.NullValues = 0
self.storeVal = storeValue
def addValue(se1lf,val):
self.RecordSize = self.RecordSize +1
#print self.FieldValues
if len(val) is 0:
self.NullValues = self.NullValues + 1
else:
if self.storeVal is not 0:
self.FieldValues.add((val.rstrip()))
if len(val) > self.FieldSize:
self.FieldSize = len(val)
def clear(self):
del self.FieldValues
def endOfLoading(self):
if self.storeVal is not 0:
self.FieldType = FieldTypes().getFieldType(self.FieldValues)
self.UniqueValues = len(self.FieldValues)
self.FieldString = FieldTypes().getEnumString(self.FieldType)
def printSummary(self):
Logger.LogMessage(self.FieldName + " " + self.FieldString+ " " +str(self.FieldSize)+ str(self.RecordSize)+ str(self.NullValues)+ str(self.UniqueValues))
def printValues(self):
for value in self.FieldValues:
print value


