import os import re #import sys #shutil def matchline(line,match): #cmatch=re.compile(match) cmatch=match checkmatch=cmatch.search(line) if checkmatch!=None: print "-----------------------------------------" print "Got the line...."+line return 0 else: return -1 def readfilestring(filename, match): linestring = open(filename, 'r') l=linestring.readline() while l != '': #print l if matchline(l,match) ==0: print "<----in file" + filename print "-----------------------------------------" l=linestring.readline() def search_file(search_path,filename,stringpatt): try: dirList = os.listdir(search_path) except Exception, e: print "Search error for OS issue - " print e else: for d in dirList: if os.path.isdir(search_path+os.sep+d) == True: search_file(search_path+os.sep+d,filename,stringpatt) elif os.path.isfile(search_path+os.sep+d) == True: #print d + " is a file update whatever required" pmatch=filename.search(d) if pmatch==None: ss=1 else: #print "-----------------------------------------" #print "The file is found - "+search_path+os.sep+d #print "-----------------------------------------" readfilestring(search_path+os.sep+d, stringpatt) else: print "Unknown filesystem object - "+search_path+os.sep+d if __name__ == '__main__': search_path = raw_input("Enter search_path: ") filepattern = raw_input("Enter filepattern: ") stringpattern= raw_input("Enter stringpattern: ") try: retest="filepattern" ire=re.compile(filepattern,re.I) retest="stringpattern" strre=re.compile(stringpattern,re.I) except Exception, e: print "Reg Ex problem - for "+retest print e else: if os.path.exists(search_path): find_file = search_file(search_path,ire,strre) else: print "Not a valid path - "+search_path
Find text/pattern under parent folder from matched file names
XML to CSV Converter
def xml2csv(inputFile,outputFile,elemName,ns,columnsToExtract): outFile=open(outputFile,'w') writer=csv.DictWriter(outFile,columnsToExtract,extrasaction='ignore',restval='') # Write Header to CSV file writer.writerow(columnsToExtract) #Sree -Used iterparse, so that only partial xml is in memory and after usage every element is cleared out of memory. for event,rec in etree.iterparse(inputFile, tag="%s%s" %(ns,elemName)): row=dict() for child in rec: row[child.tag[len(ns):]]=child.text.strip() rec.clear() writer.writerow(row) outFile.close()
XSL to CSV converter
def xls2csv(inputFile,outputFile): print "Input File : %s" %(inputFile) print "Output File : %s" %(outputFile) writer=csv.writer(open(outputFile,'w'),delimiter=',',quotechar='"') book=xlrd.open_workbook(inputFile) sheet=book.sheet_by_index(0) for row in range(sheet.nrows): writer.writerow( sheet.row_values(row))
Spell Corrector in Python
import re, collections def words(text): return re.findall('[a-z]+', text.lower()) def train(features): model = collections.defaultdict(lambda: 1) for f in features: model[f] += 1 return model NWORDS = train(words(file('big.txt').read())) alphabet = 'abcdefghijklmnopqrstuvwxyz' def edits1(word): splits = [(word[:i], word[i:]) for i in range(len(word) + 1)] deletes = [a + b[1:] for a, b in splits if b] transposes = [a + b[1] + b[0] + b[2:] for a, b in splits if len(b)>1] replaces = [a + c + b[1:] for a, b in splits for c in alphabet if b] inserts = [a + c + b for a, b in splits for c in alphabet] return set(deletes + transposes + replaces + inserts) def known_edits2(word): return set(e2 for e1 in edits1(word) for e2 in edits1(e1) if e2 in NWORDS) def known(words): return set(w for w in words if w in NWORDS) def correct(word): candidates = known([word]) or known(edits1(word)) or known_edits2(word) or [word] return max(candidates, key=NWORDS.get)
Format a plain ASCII file using Python script
#!/bin/env python # Paginate a text file, adding a header and footer import sys, time, string # If no arguments were given, print a helpful message if len(sys.argv)!=2: print 'Usage: pyprint filename' sys.exit(0) class PrinterFormatter: def __init__(self, filename, page_len=58): # Save the time of creation for inclusion in the header import time self.now=time.asctime(time.localtime(time.time())) # Save the filename and page length self.filename=filename ; self.page_len = page_len # Zero all the counters self.page=0 ; self.count=0 ; self.header_written=0 def write_header(self): # If the header for this page has just been written, don't # write another one. if self.header_written: return # Increment the page count, and reset the line count self.header_written=1 ; self.count=1 ; self.page=self.page+1 # Write the header header=self.filename p=str(self.page) ; header=string.ljust(header, 38-len(p))+'['+p+']' header=string.ljust(header, 79-len(self.now))+self.now sys.stdout.write(header+'\n\n') def writeline(self, L): # If the line is exactly 80 lines long, chop off any trailing # newline, since the printhead will wrap around length=len(L) if (length % 80) == 0 and L and L[-1]=='\n': L=L[:-1] # If we've printed a pageful of lines, output a linefeed and # output the header. if self.count>self.page_len: sys.stdout.write('') self.write_header() # Print the actual line of text sys.stdout.write(L) self.count=self.count+1 ; self.header_written=0 # Open the input file, and create a PrinterFormatter object, passing # it the filename to put in the page header. f=open(sys.argv[1], 'r') o=PrinterFormatter(sys.argv[1]) o.write_header() # Print the header on the first page # Iterate over all the lines in the file; the writeline() method will # output them and automatically add page breaks and headers where # required. while (1): L=f.readline() if L=="": break o.writeline(L) # Write a final page break and close the input file. sys.stdout.write('') f.close()
Control Structures in Python
If ==== # basic "If" if boolean: x = 1 else: x = 0 # Not so basic "If" # Notice the new keywords! if (boolean and otherboolean) or ( not boolean ): #oops, this is just if(boolean), oh well... :~) x = 1 elif not otherboolean: x = 0 else: x = 100000000000000000000 # Big numbers make if statements less boring! While ===== # This is it... while i < 100: i + 1 # More generally: while condition == true: """do the following""" ... ... condition = false. # Infinite Loop: while 1: print "I'm loopy!" FOR ==== # Basic For Loop for item in list: dosomething(item) # The traditional C for loop has a different form in Python: for x in range(0, 100): dosomething(x) # More interesting example: def buglove( UIUC ): for bug in UIUC: if type( bug ) == type( chinese_lady_beetle() ): crush( bug ) else: hug( bug )
Python script to zip and unzip files
# Simple Application/Script to Compress a File or Directory # Essentially you could use this instead of Winzip """ Path can be a file or directory Archname is the name of the to be created archive """ from zipfile import ZipFile, ZIP_DEFLATED import os # File stuff import sys # Command line parsing def zippy(path, archive): paths = os.listdir(path) for p in paths: p = os.path.join(path, p) # Make the path relative if os.path.isdir(p): # Recursive case zippy(p, archive) else: archive.write(p) # Write the file to the zipfile return def zipit(path, archname): # Create a ZipFile Object primed to write archive = ZipFile(archname, "w", ZIP_DEFLATED) # "a" to append, "r" to read # Recurse or not, depending on what path is if os.path.isdir(path): zippy(path, archive) else: archive.write(path) archive.close() return "Compression of \""+path+"\" was successful!" instructions = "zipit.py: Simple zipfile creation script." + \ "recursively zips files in a directory into" + \ "a single archive." +\ "e.g.: python zipit.py myfiles myfiles.zip" # Notice the __name__=="__main__" # this is used to control what Python does when it is called from the # command line. I'm sure you've seen this in some of my other examples. if __name__=="__main__": if len(sys.argv) >= 3: result = zipit(sys.argv[1], sys.argv[2]) print result else: print instructions ================================================================================ # Simple script to Unzip archives created by # our Zip Scripts. import sys import os from zipfile import ZipFile, ZIP_DEFLATED def unzip( path ): # Create a ZipFile Object Instance archive = ZipFile(path, "r", ZIP_DEFLATED) names = archive.namelist() for name in names: if not os.path.exists(os.path.dirname(name)): # Create that directory os.mkdir(os.path.dirname(name)) # Write files to disk temp = open(name, "wb") # create the file data = archive.read(name) #read the binary data temp.write(data) temp.close() archive.close() return "\""+path+"\" was unzipped successfully." instructions = "This script unzips plain jane zipfiles:"+\ "e.g.: python unzipit.py myfiles.zip" if __name__=="__main__": if len(sys.argv) == 2: msg = unzip(sys.argv[1]) print msg else: print instructions
Fast and efficient Backup script that works on Windows and Linux
import os, os.path import subprocess import time def backUpDir(path): """ Creates backup of the passed old dir and creates a new dir. The backed up dir gets the date and time as the name of the new backed up file. On success, returns a list consising of two values: 0: to signify the success None: means no error occurred. On error, return a list consisting of two values: -1 : to signify the failure error string: the exact error string """ if os.path.exists(path) == True: #dir exists then backup old dir and create new backupDir = path + time.strftime('-%Y-%m-%d-%Hh%Mm%Ss') if os.name == "nt": #NT Sysyem - used the DOS Command 'move' to rename the folder cmd = subprocess.Popen(["mov", path, backupDir], \ shell = True, \ stdout = subprocess.PIPE, \ stdin = subprocess.PIPE, \ stderr = subprocess.PIPE) elif os.name == "posix": #POSIX System - use the appropriate POSIX command to rename the folder. cmd=subprocess.Popen(["mv", path, backupDir], \ shell = True, \ stdout = subprocess.PIPE, \ stdin = subprocess.PIPE, \ stderr = subprocess.PIPE) pass else: # Not supported on other platforms return [-1, "Not supported on %s platform" %(os.name)] (out, err) = cmd.communicate() if len(err) != 0: return [-1, err] else: os.mkdir(path) return [0, None] else: #create new dir os.mkdir(path) return [0, None]
File Splitter Script
#!/usr/bin/env python # encoding: utf-8 import sys from math import * import os import random from Tkinter import * import tkFileDialog def linesplit(): line=0 k1=tkFileDialog.askopenfilename() ff=open(k1) for lineee in ff: line=line+1 print "total number of lines is:"+str(line) ff.close() line=0 user_ask=raw_input("Specify the number of records in each file = ") user_ask=eval(user_ask) print user_ask ff=open(k1) i=1 temp=user_ask for lineee in ff: line=line+1 temp_filename="c:\Temp\split_"+str(i)+".txt" if(line < user_ask): ff1=open(temp_filename,'a+') ff1.write(lineee) ff1.close() if(line == user_ask): user_ask=user_ask+temp i=i+1 ff.close() flag=1 return (line,flag) k=linesplit() flag=k[1]
Python script for walking the directory tree structure but excluding some directories and files
from os.path import join, getsize for root, dirs, files in walk('python/Lib/email'): print root, "consumes", print sum([getsize(join(root, name)) for name in files]), print "bytes in", len(files), "non-directory files" if 'CVS' in dirs: dirs.remove('CVS') # don't visit CVS directories """ from os.path import join, isdir, islink from os import error, listdir # We may not have read permission for top, in which case we can't # get a list of the files the directory contains. os.path.walk # always suppressed the exception then, rather than blow up for a # minor reason when (say) a thousand readable directories are still # left to visit. That logic is copied here. try: # Note that listdir and error are globals in this module due # to earlier import-*. names = listdir(top) except error, err: if onerror is not None: onerror(err) return if exclude_list != []: from copy import deepcopy temp_name = deepcopy(names) names = [item for item in temp_name if item not in exclude_list] dirs, nondirs = [], [] for name in names: if isdir(join(top, name)): dirs.append(name) else: nondirs.append(name) if topdown: yield top, dirs, nondirs for name in dirs: path = join(top, name) if not islink(path): for x in walkExclusive(path, topdown, onerror, exclude_list): yield x if not topdown: yield top, dirs, nondirs
Python XML Parser
import xml.dom.minidom class XMLParser: def __init__(self, filePath): self.xml_file_path = filePath def get_a_document(self): return xml.dom.minidom.parse(self.xml_file_path) def process_xml_return_dict(self): doc = self.get_a_document() fieldMapping = doc.childNodes[0] operations = {} for layer in fieldMapping.childNodes: if layer.nodeType == xml.dom.minidom.Node.TEXT_NODE or layer.nodeType == xml.dom.minidom.Node.COMMENT_NODE: continue fieldList = self.get_fields(layer) attrList = self.get_attributes(layer) operations[layer.nodeName] = (attrList, fieldList) return operations def get_attributes(self, node): attrList = [] nodeMap = node.attributes for index in range(nodeMap.length): attrName = nodeMap.item(index).name attrValue = node.getAttribute(attrName) attrList.append((attrName, attrValue)) return attrList def get_fields(self, node): fieldList = [] for childNode in node.childNodes: if childNode.nodeType != xml.dom.minidom.Node.TEXT_NODE and childNode.nodeType != xml.dom.minidom.Node.COMMENT_NODE: fromField = childNode.getAttribute('FromField') toField = childNode.getAttribute('ToField') fieldValue = childNode.getAttribute('Value') condition = childNode.getAttribute('Condition') fieldList.append((fromField, toField, fieldValue, condition)) return fieldList Python pickle module & Zip file creation Description The solution gives a solution to use the pickle module and to create a Zip File. # below is a typical Python dictionary object of roman numerals romanD1 = {'I':1,'II':2,'III':3,'IV':4,'V':5,'VI':6,'VII':7,'VIII':8,'IX':9,'X':10} # to save a Python object like a dictionary to a file # and load it back intact you have to use the pickle module import pickle print "The original dictionary:" print romanD1 file = open("roman1.dat", "w") pickle.dump(romanD1, file) file.close() # now load the dictionay object back from the file ... file = open("roman1.dat", "r") romanD2 = pickle.load(file) file.close() print "Dictionary after pickle.dump() and pickle.load():" print romanD2 # for large text files you can write and read a zipped file (PKZIP format) # notice that the syntax is mildly different from normal file read/write import zipfile zfilename = "English101.zip" zout = zipfile.ZipFile(zfilename, "w") zout.writestr(zfilename, str1 + str2) zout.close() # read the zipped file back in zin = zipfile.ZipFile(zfilename, "r") strz = zin.read(zfilename) zin.close() print "Testing the contents of %s:" % zfilename print strz
IP Address Generation
class IP def initialize(ip) @ip = ip end def to_s @ip end def==(other) to_s==other.to_s end def succ return @ip if @ip == "255.255.255.255" parts = @ip.split('.').reverse parts.each_with_index do |part,i| if part.to_i < 255 part.succ! break elsif part == "255" part.replace("0") unless i == 3 else raise ArgumentError, "Invalid number #{part} in IP address" end end parts.reverse.join('.') end def succ! @ip.replace(succ) end end print "Input Starting IP Address: " start_ip = gets.strip print "Input Ending IP Address: " end_ip = gets.strip i = IP.new(start_ip) ofile = File.open("ips.txt", "w") ofile.puts i.succ! until i == end_ip ofile.close
Orphan Checker
links = Array.new orphans = Array.new dir_array = [Dir.getwd] unless File.readable?("links.txt") puts "File is not readable." exit end File.open('links.txt', 'rb') do |lv| lv.each_line do |line| links << line.chomp end end begin p = dir_array.shift Dir.chdir(p) Dir.foreach(p) do |filename| next if filename == '.' or filename == '..' if !File::directory?(filename) orphans << p + File::SEPARATOR + filename else dir_array << p + File::SEPARATOR + filename end end end while !dir_array.empty? orphans -= links File.open("orphans.txt", "wb") do |o| o.puts orphans end
HTML Email using Python
def createhtmlmail( html, text, subject ): """Create a mime-message that will render HTML in popular MUAs, text in better ones""" import MimeWriter import mimetools import cStringIO out = cStringIO.StringIO() # output buffer for our message htmlin = cStringIO.StringIO(html) writer = MimeWriter.MimeWriter(out) # # set up some basic headers... we put subject here # because smtplib.sendmail expects it to be in the # message body # writer.addheader("Subject", subject) writer.addheader("MIME-Version", "1.0") # # start the multipart section of the message # multipart/alternative seems to work better # on some MUAs than multipart/mixed # writer.startmultipartbody("alternative") writer.flushheaders() # # the plain text section # if text != None: txtin = cStringIO.StringIO(text) subpart = writer.nextpart() subpart.addheader("Content-Transfer-Encoding", "quoted-printable") pout = subpart.startbody("text/plain", [("charset", 'us-ascii')]) mimetools.encode(txtin, pout, 'quoted-printable') txtin.close() # # start the html subpart of the message # subpart = writer.nextpart() subpart.addheader("Content-Transfer-Encoding", "quoted-printable") # # returns us a file-ish object we can write to # pout = subpart.startbody("text/html", [("charset", 'us-ascii')]) mimetools.encode(htmlin, pout, 'quoted-printable') htmlin.close() # # Now that we're done, close our writer and # return the message body # writer.lastpart() msg = out.getvalue() out.close() #print msg return msg def sendmail (sender,to,subject,htmlFilename): import smtplib f = open(htmlFilename, 'r') html = f.read() f.close() message = createhtmlmail(html, None, subject) server = smtplib.SMTP("mta-hub") server.sendmail(sender, to, message) server.quit()
Python interface to load commands.
This is just a code snippet elaborating the idea. Adding features like exceptions, Fancier formatting , Error redirection etc is left to you !
""" This Function loads a command file into memory as a dictonary , With Commands as keys""" def load_command_file(file_name): command_dict={} if file_name.isalpha(): print("Error!!command file name should be in ascii\n"); return command_dict file=open(file_name,'r') file_lines=file.readlines() for line in file_lines: command_list=line.strip().split("=") command_dict[command_list[0].strip()]=command_list[1].strip() file.close() return command_dict ######################################################################################################## """ This function uses the above mentioned methods to create commands based on passed args""" def create_command(*args): command_dict=load_command_file(r"C:\Documents and Settings\Raj\Desktop\command_file.txt") #print(command_dict) arg_len=len(args) command_str=command_dict[args[arg_len-1]] if command_str.count('%s')!=arg_len-1: print("Error mismatch in args passed and args actually needed") sys.exit() command=command_str%(args[:-1]) return command
Python script to Validate Tabular format Data and database restore
Our python utility does all these functionality.
Major functionality:
- Process a batch of large cvs/text files
- Can iterate in sub-folders
- File can be filtered on criteria expressed in regular expression format
- Reading cvs/text files
- Performing sanity and basic column based validation checks like null values, duplicate values, max, min, unique values etc.
- It also infer data type for all columns
- SQL Server connection
- On the fly table creation and bulk insertion for any given text/cvs file
- Logging of all activities using a separate logger module
- Highly configured utility
Main.py:
import Utility import Logger import FieldTypes import SQLServerConnection import datetime import os import re import string import pyodbc #start time Logger.LogMessage(str(datetime.datetime.now())) dbConn = SQLServerConnection(); temp = Utility() #read source directory reader = open("srcdir.txt","rb") srcdir = reader.readline() iterate source directory temp.iterateFolder(srcdir) temp.reportProfile() print "profiling done" reader.close(); #end time Logger.LogMessage(str(datetime.datetime.now())) print datetime.datetime.now()
Utility.py
import SQLServerConnection import Field import csv import os import re import string from Logger import Logger import datetime class Utility: fileName = "" def __init__(self): #initialise class variable #map for fieldName -> fieldInformation self.data = dict() self.profileOutput = open('C:\\temp\\profile.csv','w') self.output = ['FileName','FieldName','InferredDataType','Len','TotalRows','NullValues','UniqueValues'] self.profileOutput.write(','.join(self.output)) self.profileOutput.write('\n') self.profileList = [] def readcvsFile(self, fileName, onlyName): ROWSEPARATER = "\n" FIELDSEPARATER = "\t" self.data = dict() #read as a text file reader = open(fileName,"rb") headers = reader.readline().split(FIELDSEPARATER) #header row Column = len(headers) #initialised the map for column in headers: self.data[column]=Field.Field(column.rstrip()) Logger.LogMessage("No of column: " + str(Column)) #process every row RowCount = 0 ErrorRow = 0 for rowstring in reader: RowCount = RowCount + 1 rowCol = rowstring.count(FIELDSEPARATER) + 1 row = rowstring.split(FIELDSEPARATER) #need to check value of last col #if it has "," it means there are more delimiter in text file if rowCol == Column: i =0 for column in row: (self.data[headers[i]]).addValue(column) i = i + 1 else: Logger.LogMessage("Row number " + str(RowCount) + " is not valid. It has " + str(rowCol) + " columns.") ErrorRow = ErrorRow + 1 Logger.LogMessage("No of Rows processed(Other than header file):"+str(RowCount)) Logger.LogMessage("Total Errors: "+str(ErrorRow)) reader.close(); name, extention = onlyName.split(".") createFile = "create table " + name + "(" fieldCount = 1 for k,v in self.data.iteritems(): #mark end of processing and take profiled output for each field v.endOfLoading() self.profileList.append([fileName, v.FieldName, v.FieldString, str(v.FieldSize), str(v.RecordSize), str(v.NullValues), str(v.UniqueValues)]) if fieldCount > 1: createFile = createFile + ", " createFile = createFile + v.FieldName + " varchar(1000) " fieldCount = fieldCount + 1 v.clear() createFile = createFile + ")" Logger.LogMessage("Sql for file creation:" + createFile) SQLServerConnection.cursor.execute(createFile) SQLServerConnection.conn.commit() bulkcopySQL = "BULK INSERT " + name + " FROM '" + fileName + "' WITH ( FIELDTERMINATOR='\t',FIRSTROW=2,ROWTERMINATOR='" + chr(10) + "')" Logger.LogMessage("running bluk copy:" + bulkcopySQL) SQLServerConnection.cursor.execute(bulkcopySQL) SQLServerConnection.conn.commit() del self.data def iterateFolder(self,dir): fileexp = re.compile(r'\w*\.csv') #iterate for the directory for f in os.listdir(dir): if os.path.isfile(os.path.join(dir,f)) and fileexp.match(f) is not None: Logger.LogMessage("***************************************") Logger.LogMessage("FileName: " + f) Logger.LogMessage("Directory Name: " + dir) self.readcvsFile(os.path.join(dir,f), f) Logger.LogMessage("*************************************") Logger.LogMessage(str(datetime.datetime.now())) Logger.LogMessage(" ") print "Processing " , f Logger.flush() elif os.path.isdir(os.path.join(os.getcwd(),f)): print os.path.join(dir,f) self.iterateFolder(os.path.join(dir,f)) def reportProfile(self): for detail in self.profileList: self.profileOutput.write(','.join(detail)) self.profileOutput.write('\n')
Field.py:
from types import * import re import datetime import Logger class FieldTypes: Null,Integer, Float, Varchar, Date, DateTime = range(6) intexp = re.compile('[0-9]+$') floatexp = re.compile('[0-9]*\.[0-9]+$') dateexp = re.compile(r'[0-9]{1,4}([.,/-])[0-9]{1,2}\1[0-9]{1,4}$') #this functions returns corresponding string for enum type def getEnumString(self, type): if type == self.Integer: return "Integer" if type is self.Float: return "numeric" if type is self.Varchar: return "varchar" if type is self.Date: return "date" if type is self.Null: return "null" #this function infers datatype using combination of new value and calculated datatype so far def fieldType(self,value, previousType): if len(value) is 0: return previousType if previousType is self.Float: if self.floatexp.match(value) is not None: return self.Float if previousType is self.Date: if self.dateexp.match(value) is not None: return self.Date if previousType is self.Null: if self.intexp.match(value) is not None: return self.Integer if self.floatexp.match(value) is not None: return self.Float if self.dateexp.match(value) is not None: return self.Date if previousType is self.Integer: if self.intexp.match(value) is not None: return self.Integer if self.floatexp.match(value) is not None: return self.Float if self.dateexp.match(value) is not None: return self.Date return self.Varchar def getFieldType(self,valueList): Type = self.Null for value in valueList: if Type is self.Varchar: break Type = self.fieldType(value, Type) del valueList return Type #Field class represent a complete field domain in the table. It has all properties #related to a field like name, datatype, size, min value etc. It has contains all #list of values for that particular field. class Field: def __init__(self, name, storeValue=0): self.FieldName = name self.FieldSize = 0 self.FieldType = FieldTypes.Null self.FieldValues = set() self.RecordSize = 0 self.UniqueValues = 0 self.FieldString = "" self.NullValues = 0 self.storeVal = storeValue def addValue(se1lf,val): self.RecordSize = self.RecordSize +1 #print self.FieldValues if len(val) is 0: self.NullValues = self.NullValues + 1 else: if self.storeVal is not 0: self.FieldValues.add((val.rstrip())) if len(val) > self.FieldSize: self.FieldSize = len(val) def clear(self): del self.FieldValues def endOfLoading(self): if self.storeVal is not 0: self.FieldType = FieldTypes().getFieldType(self.FieldValues) self.UniqueValues = len(self.FieldValues) self.FieldString = FieldTypes().getEnumString(self.FieldType) def printSummary(self): Logger.LogMessage(self.FieldName + " " + self.FieldString+ " " +str(self.FieldSize)+ str(self.RecordSize)+ str(self.NullValues)+ str(self.UniqueValues)) def printValues(self): for value in self.FieldValues: print value
Python script to Load Configuration file content into variable
def LoadCfg(filename): result=[] fileptr = open(filename,"r") for line in fileptr: spt_str=line.split("=") spt_str[0]= spt_str[0].strip() spt_str[1]= spt_str[1].strip('\n') spt_str[1]= spt_str[1].strip() spt_str[1]= spt_str[1].strip('"') result.append(spt_str) fileptr.close() return dict(result)
Python Script to search file for specific content
def SearchFile(filename,key): fileptr = open(filename, "r") for line in fileptr: if string.find(line,key) >=0 : fileptr.close() return 1 fileptr.close() return 0
Network Wise File Concatenation
#!/usr/bin/env python import sys, os, fnmatch, re from threading import Thread from threading import BoundedSemaphore pool_sema = BoundedSemaphore(value=20) # only 10 can run at a time maxsize = 200 # 10 lines for testing source_directory = "/ranger/RangerRoot/RangerData/CDRDelegatorData/" destination_directory = "/ranger/RangerRoot/RangerData/CDRDelegatorDataPerf/" log_directory = "/ranger/RangerRoot/LOG/" Success_Directory = "/ranger/RangerRoot/RangerData/CDRDelegatorData/success/" Success_Directory_Perf = "/ranger/RangerRoot/RangerData/CDRDelegatorDataPerf/success/" filters = [] os.system('date') for i in range(1, 24): filters.append("%02d*" % i) filters.append("90*") print filters def fnumlines(filename): f = open(filename) return f.read().count("\n") class PROCESS(Thread): def __init__(self, filelist, pool_sema, network_id): Thread.__init__(self) self.list = filelist self.pool_sema = pool_sema self.network_id = network_id def run(self): self.pool_sema.acquire() filename = self.list[0] PerfSuccessFile = filename logFile = re.sub("[*]","",self.network_id) + "CDRDelegatorConcatenation.log" LogFullName = os.path.join(log_directory, logFile) log = open ( LogFullName ,"a") file = os.path.join(source_directory, filename) os.system('cp %s /ranger/ravi/subscriber/' % (file)) rm_success = os.path.join(Success_Directory,filename) filename = file touch_Perf = os.path.join(Success_Directory_Perf, PerfSuccessFile ) numlines = fnumlines(file) try: f = open(file, "a") for filename_temp in self.list[1:]: file = os.path.join(source_directory, filename_temp) os.system('cp %s /ranger/ravi/subscriber/' % (file)) rm_success_file = os.path.join(Success_Directory, filename_temp) log_file = self.network_id + "date.log" log_date = os.path.join(log_directory, log_file) if numlines < maxsize: try: os.system('date > %s' % ( log_date)) log.write(open(log_date).read()) log.write("%s is concatenating to %s \n " % (file,filename)) except IOError: log.write("Error: can\'t find file or write the data \n") try: f.write(open(file).read()) except IOError: os.system('date > %s' % ( log_date )) log.write(open(log_date).read()) log.write("Error occured while reading the file %s \n" % (file)) else: log.write("%s has been concatenated to %s ... DONE\n" % (file,filename)) numlines += fnumlines(file) os.system('rm %s' %(file) ) os.system('rm %s' %(rm_success_file)) else: f.close() # move the file filename os.system('mv %s %s' %(filename, destination_directory)) touch_Perf = os.path.join(Success_Directory_Perf, PerfSuccessFile) touch_file = open(touch_Perf,"w") print touch_Perf touch_file.close() PerfSuccessFile = filename_temp os.system('rm %s' % (rm_success)) # Use a new file filename = os.path.join(source_directory, file) rm_success = rm_success_file numlines = fnumlines(file) f = open(file, "a") filename = file finally: f.close() log.close() os.system('mv %s %s' %(filename, destination_directory)) touch_Perf = os.path.join(Success_Directory_Perf, PerfSuccessFile ) touch_file = open(touch_Perf,"w") print touch_Perf touch_file.close() os.system('rm %s' % (rm_success)) self.pool_sema.release() def derectory_listing(directory): flist = os.listdir(directory) for i in range(len(flist)): full_path = os.path.join(Success_Directory,flist[i]) statinfo = os.stat(full_path) flist[i] = statinfo.st_mtime,flist[i] flist.sort() x = [] for i in range(len(flist)): x.append(flist[i][1]) return x #allfiles = os.listdir(sys.argv[1]) allfiles = derectory_listing(sys.argv[1]) threadlist = [] for filter in filters: files = fnmatch.filter(allfiles, filter) if not files: continue thread = PROCESS(files, pool_sema, filter) threadlist.append(thread) thread.start() for thread in threadlist: thread.join() os.system('date') #Usage: ./thread_merge_exception.py "Directory Name"
Python Script for Adding or Subtracting the Dates
#----------------------------- # Adding to or Subtracting from a Date # Use the rather nice datetime.timedelta objects now = datetime.date(2003, 8, 6) difference1 = datetime.timedelta(days=1) difference2 = datetime.timedelta(weeks=-2) print "One day in the future is:", now + difference1 #=> One day in the future is: 2003-08-07 print "Two weeks in the past is:", now + difference2 #=> Two weeks in the past is: 2003-07-23 print datetime.date(2003, 8, 6) - datetime.date(2000, 8, 6) #=> 1095 days, 0:00:00 #----------------------------- birthtime = datetime.datetime(1973, 01, 18, 3, 45, 50) # 1973-01-18 03:45:50 interval = datetime.timedelta(seconds=5, minutes=17, hours=2, days=55) then = birthtime + interval print "Then is", then.ctime() #=> Then is Wed Mar 14 06:02:55 1973 print "Then is", then.strftime("%A %B %d %I:%M:%S %p %Y") #=> Then is Wednesday March 14 06:02:55 AM 1973 #----------------------------- when = datetime.datetime(1973, 1, 18) + datetime.timedelta(days=55) print "Nat was 55 days old on:", when.strftime("%m/%d/%Y").lstrip("0") #=> Nat was 55 days old on: 3/14/1973
Program to create make file in VMS/UNIX
This program will used to create a make file in either VMS or UNIX platform. We can use this as reusable module to import in some other program to generate make file in both platforms.
#Program to create executable in either VMS or UNIX platform: import osimport sys def make(project_name, param = ""): """Builds a project with unit test directives on. - project_name the name of the project. - param (optional) but can be set to any parameter (taken by make.com) for VMS and only "nodebug" for UNIX. """ project_name = project_name.lower() if sys.platform == "OpenVMS": option = '' if param: option = ' /' + param make_command = '$ pre_c_defines = "define=(""UNITTEST"")"\n' \ '$ pre_cxx_defines = "define=(""UNITTEST"")"\n' \ '$ sym_comp_c_defines = "/define=UNITTEST"\n' \ '$ sym_comp_cxx_defines = ' \ '"/define=(UNITTEST,__USE_STD_IOSTREAM)"\n' \ '$ make ' + project_name + option + '\n' else: # UNIX platform option = '' if param is "nodebug" and isdebug(): # This is the same as release command which is currently undefined. option = 'unset sym_debug;. setdef.ksh > /dev/null\n' make_command = option + \ 'export sym_proc_parm="define=UNITTEST"\n' \ 'export sym_comp_parm="-DUNITTEST"\n' \ 'rmake ' + project_name os.system(make_command) return
Python Code for creating Screen saver
import appuifw from graphics import * import e32 from key_codes import * class Keyboard(object): def __init__(self,onevent=lambda:None): self._keyboard_state={} self._downs={} self._onevent=onevent def handle_event(self,event): if event['type'] == appuifw.EEventKeyDown: code=event['scancode'] if not self.is_down(code): self._downs[code]=self._downs.get(code,0)+1 self._keyboard_state[code]=1 elif event['type'] == appuifw.EEventKeyUp: self._keyboard_state[event['scancode']]=0 self._onevent() def is_down(self,scancode): return self._keyboard_state.get(scancode,0) def pressed(self,scancode): if self._downs.get(scancode,0): self._downs[scancode]-=1 return True return False keyboard=Keyboard() appuifw.app.screen='full' img=None def handle_redraw(rect): if img: canvas.blit(img) appuifw.app.body=canvas=appuifw.Canvas( event_callback=keyboard.handle_event, redraw_callback=handle_redraw) img=Image.new(canvas.size) running=1 def quit(): global running running=0 appuifw.app.exit_key_handler=quit location=[img.size[0]/2,img.size[1]/2] speed=[0.,0.] blobsize=16 xs,ys=img.size[0]-blobsize,img.size[1]-blobsize gravity=0.03 acceleration=0.05 import time start_time=time.clock() n_frames=0 labeltext=u'Use arrows to move ball' textrect=img.measure_text(labeltext, font='normal')[0] text_img=Image.new((textrect[2]-textrect[0],textrect[3]-textrect[1])) text_img.clear(0) text_img.text((-textrect[0],-textrect[1]),labeltext,fill=0xffffff,font='normal') while running: img.clear(0) img.blit(text_img, (0,0)) img.point((location[0]+blobsize/2,location[1]+blobsize/2), 0x00ff00,width=blobsize) handle_redraw(()) e32.ao_yield() speed[0]*=0.999 speed[1]*=0.999 speed[1]+=gravity location[0]+=speed[0] location[1]+=speed[1] if location[0]>xs: location[0]=xs-(location[0]-xs) speed[0]=-0.80*speed[0] speed[1]=0.90*speed[1] if location[0]<0: location[0]=-location[0] speed[0]=-0.80*speed[0] speed[1]=0.90*speed[1] if location[1]>ys: location[1]=ys-(location[1]-ys) speed[0]=0.90*speed[0] speed[1]=-0.80*speed[1] if location[1]<0: location[1]=-location[1] speed[0]=0.90*speed[0] speed[1]=-0.80*speed[1] if keyboard.is_down(EScancodeLeftArrow): speed[0] -= acceleration if keyboard.is_down(EScancodeRightArrow): speed[0] += acceleration if keyboard.is_down(EScancodeDownArrow): speed[1] += acceleration if keyboard.is_down(EScancodeUpArrow): speed[1] -= acceleration if keyboard.pressed(EScancodeHash): filename=u'e:\\screenshot.png' canvas.text((0,32),u'Saving screenshot to:',fill=0xffff00) canvas.text((0,48),filename,fill=0xffff00) img.save(filename) n_frames+=1 end_time=time.clock() total=end_time-start_time print "%d frames, %f seconds, %f FPS, %f ms/frame."%(n_frames,total, n_frames/total, total/n_frames*1000.)
Run Application from host
import os os.execl( "c:/Windows/Notepad.exe", "c:/userlog.txt") print "Running notepad" #or import subprocess subprocess.call("c:/Windows/Notepad.exe") print "Running notepad" #starting a Web browser: import os os.system('/usr/bin/firefox') os.system(r'c:\"Program Files"\"Mozilla Firefox"\firefox.exe') #Windows specific function os.startfile: import os os.startfile(r' c:\Program Files\Mozilla Firefox\firefox.exe')
Multithreading in Python
This code will give you clear view of how python programming is useful in threading compare to other programming language.
import threading,Queue,time,sys,traceback #Globals (start with a captial letter) Qin = Queue.Queue() Qout = Queue.Queue() Qerr = Queue.Queue() Pool = [] def err_msg(): trace= sys.exc_info()[2] try: exc_value=str(sys.exc_value) except: exc_value='' return str(traceback.format_tb(trace)),str(sys.exc_type),exc_value def get_errors(): try: while 1: yield Qerr.get_nowait() except Queue.Empty: pass def process_queue(): flag='ok' while flag !='stop': try: flag,item=Qin.get() #will wait here! if flag=='ok': newdata='new'+item Qout.put(newdata) except: Qerr.put(err_msg()) def start_threads(amount=5): for i in range(amount): thread = threading.Thread(target=process_queue) thread.start() Pool.append(thread) def put(data,flag='ok'): Qin.put([flag,data]) def get(): return Qout.get() #will wait here! def get_all(): try: while 1: yield Qout.get_nowait() except Queue.Empty: pass def stop_threads(): for i in range(len(Pool)): Qin.put(('stop',None)) while Pool: time.sleep(1) for index,the_thread in enumerate(Pool): if the_thread.isAlive(): continue else: del Pool[index] break #STANDARD use: for i in ('b','c'): put(i) start_threads() stop_threads() for i in get_all(): print i for i in get_errors(): print i #POOL use #put element into input queue put('a') #setup threads -- will run forever as a pool until you shutdown start_threads() for i in ('b','c'): put(i) #get an element from output queue print get() #put even more data in, 7 causes an error for i in ('d','e',7): put(i) #get whatever is available for i in get_all(): print i #stop_threads only returns when all threads have stopped stop_threads() print '__threads finished last data available__' for i in get_all(): print i for i in get_errors(): print i #starting up threads again start_threads() put('f') stop_threads() print '__threads finished(again) last data available__' for i in get_all(): print i for i in get_errors(): print i
This utility determines the size of a folder and its subfolders in MB
This simple Python snippet will list the size of all the subfolders in a parent folder in MB.This is developed by using a python utility OS.
# determine size of a given folder in MBytes import os # pick a folder you have ... folder = 'D:\\zz1' folder_size = 0 for (path, dirs, files) in os.walk(folder): for file in files: filename = os.path.join(path, file) folder_size += os.path.getsize(filename) print "Folder = %0.1f MB" % (folder_size/(1024*1024.0))
Using UNIX Environment Variables From Python
import os command = "unset TEST " + "\n" \ "export GLOBAL_VALUE='global_value'" + "\n" \ "sample\n" scr_name = "temp.tmp" scr_file = open(scr_name, 'w') #write everything to a scr_file.write(command) scr_file.close() os.chmod(scr_name, 0744) # mode -rwxr--r-- status = os.system(scr_name) os.remove(scr_name)
CVS Macro for Commited Files
from cvsgui.Macro import * from cvsgui.CvsEntry import * from cvsgui.ColorConsole import * import cvsgui.App import os.path, os, shutil import zipfile, string import re from cvsgui.Persistent import * from cvsgui.MenuMgr import * from cvsgui.SafeTk import * class MyConfig(Macro): def __init__(self): Macro.__init__(self, "My Utility", MACRO_SELECTION, 0, "My Menu") self.m_path = Persistent("PY_PATH", "select the destination folder", 1) self.m_date = Persistent("PY_DATE", "dd-mmm-yyyy", 1) self.m_name = Persistent("PY_NAME","name",1) def OnCmdUI(self, cmdui): cmdui.Enable(1) def Run(self): msg = "Please enter the path for delivery :" msg1 = "Please enter the Date from when you want modified files :" msg2 = "Please enter the user name whose commited files have to be created in directory structure" title = "Path" outputpath = str(self.m_path) datestr = str(self.m_date) namestr = str(self.m_name) res, outputpath = cvsgui.App.PromptEditMessage(msg, outputpath, title) if res and len(outputpath) > 0: self.m_path << outputpath res,datestr = cvsgui.App.PromptEditMessage(msg1, datestr, title) if res and len(datestr) > 0: self.m_date << datestr res,namestr = cvsgui.App.PromptEditMessage(msg2, namestr, title) if res and len(namestr) > 0: self.m_name << namestr cvs = Cvs(1) console = ColorConsole() console << kMagenta << datestr << "\n" << kNormal console << kMagenta << "Username:" << namestr << "\n" << kNormal okCancel = _cvsgui.PromptMessage("Shall I Continue", "Message", "OK", "Cancel") if okCancel == 0: return code, out, err = cvs.Run("history", "-xACM", "-D%s" % datestr, "-u%s" % namestr) lines= string.split(out, '\n') console << kMagenta << "Copying files : " <<"\n" for line in lines: #console << kMagenta << line << "\n" << kNormal mobj = re.match( "^[MCA] \d\d\d\d-\d\d-\d\d \d\d:\d\d \+0000 [a-z_]+ +[\d.]+ +(.*) +(PW_Portal.*) +==", line) if mobj: file_name = mobj.group(1).strip() dir_name = mobj.group(2).strip() #console << kBlue << dir_name << "/" << file_name << "\n" << kNormal srcname = os.path.join("c:/work/", dir_name + '/'+ file_name) targetname = os.path.join(outputpath, "delivery"+ '/'+ dir_name) if not os.path.exists(targetname): os.makedirs(targetname) shutil.copy(srcname, targetname) console << kGreen << srcname << " to " << targetname <<"\n" << kNormal MyConfig()
Generating unique id's
The script is developed in python.It sets unique identifier for every database.It can be reused to set unique ids (alphanumeric values) to any set of data.
def generate_grpid(dbname): dict = {'pdb':0, 'vast':1, 'taxonomy':2, 'genbank':3, 'dbSNP':4, 'enzyme':5, 'uniprot':6, 'go':7, 'interpro':8, 'rebase':9, 'prosite':10 'pdb-hetero':11 } x = dict[dbname] # 'x' is the unique id returned. return x
Bind Key action: Return
Bind Key action: Return
from Tkinter import * root = Tk() def greet(*ignore): print 'Hello World' root.bind('', greet) root.mainloop()
Write CSV File Using Python
import csv spamWriter = csv.writer(open('eggs.csv', 'w'), delimiter=' ', quotechar='|', quoting=QUOTE_MINIMAL) spamWriter.writerow(['Spam'] * 5 + ['Baked Beans']) spamWriter.writerow(['Spam', 'Lovely Spam', 'Wonderful Spam'])
Read CSV File using Python
import csv spamReader = csv.reader(open('eggs.csv'), delimiter=' ', quotechar='|') for row in spamReader: print ', '.join(row) # Spam, Spam, Spam, Spam, Spam, Baked Beans # Spam, Lovely Spam, Wonderful Spam
Create an HTML message with an alternative plain text version
Here is a Python Program to create an HTML message with an alternative plain text version
#!/usr/bin/env python import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText # me == my email address # you == recipient's email address me = "my@email.com" you = "your@email.com" # Create message container - the correct MIME type is multipart/alternative. msg = MIMEMultipart('alternative') msg['Subject'] = "Link" msg['From'] = me msg['To'] = you # Create the body of the message (a plain-text and an HTML version). text = "Hi!\nHow are you?\nHere is the link you wanted:\nhttp://www.python.org" html = """\Hi!
""" # Record the MIME types of both parts - text/plain and text/html. part1 = MIMEText(text, 'plain') part2 = MIMEText(html, 'html') # Attach parts into message container. # According to RFC 2046, the last part of a multipart message, in this case # the HTML message, is best and preferred. msg.attach(part1) msg.attach(part2) # Send the message via local SMTP server. s = smtplib.SMTP('localhost') # sendmail function takes 3 arguments: sender's address, recipient's address # and message to send - here it is sent as one string. s.sendmail(me, you, msg.as_string()) s.quit()
How are you?
Here is the link you wanted.
To Send the entire contents of directory as an email Message.
Here is a Python Program to send the entire contents of a directory as an email message
#!/usr/bin/env python """Send the contents of a directory as a MIME message.""" import os import sys import smtplib # For guessing MIME type based on file name extension import mimetypes from optparse import OptionParser from email import encoders from email.message import Message from email.mime.audio import MIMEAudio from email.mime.base import MIMEBase from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText COMMASPACE = ', ' def main(): parser = OptionParser(usage="""\ Send the contents of a directory as a MIME message. Usage: %prog [options] Unless the -o option is given, the email is sent by forwarding to your local SMTP server, which then does the normal delivery process. Your local machine must be running an SMTP server. """) parser.add_option('-d', '--directory', type='string', action='store', help="""Mail the contents of the specified directory, otherwise use the current directory. Only the regular files in the directory are sent, and we don't recurse to subdirectories.""") parser.add_option('-o', '--output', type='string', action='store', metavar='FILE', help="""Print the composed message to FILE instead of sending the message to the SMTP server.""") parser.add_option('-s', '--sender', type='string', action='store', metavar='SENDER', help='The value of the From: header (required)') parser.add_option('-r', '--recipient', type='string', action='append', metavar='RECIPIENT', default=[], dest='recipients', help='A To: header value (at least one required)') opts, args = parser.parse_args() if not opts.sender or not opts.recipients: parser.print_help() sys.exit(1) directory = opts.directory if not directory: directory = '.' # Create the enclosing (outer) message outer = MIMEMultipart() outer['Subject'] = 'Contents of directory %s' % os.path.abspath(directory) outer['To'] = COMMASPACE.join(opts.recipients) outer['From'] = opts.sender outer.preamble = 'You will not see this in a MIME-aware mail reader.\n' for filename in os.listdir(directory): path = os.path.join(directory, filename) if not os.path.isfile(path): continue # Guess the content type based on the file's extension. Encoding # will be ignored, although we should check for simple things like # gzip'd or compressed files. ctype, encoding = mimetypes.guess_type(path) if ctype is None or encoding is not None: # No guess could be made, or the file is encoded (compressed), so # use a generic bag-of-bits type. ctype = 'application/octet-stream' maintype, subtype = ctype.split('/', 1) if maintype == 'text': fp = open(path) # Note: we should handle calculating the charset msg = MIMEText(fp.read(), _subtype=subtype) fp.close() elif maintype == 'image': fp = open(path, 'rb') msg = MIMEImage(fp.read(), _subtype=subtype) fp.close() elif maintype == 'audio': fp = open(path, 'rb') msg = MIMEAudio(fp.read(), _subtype=subtype) fp.close() else: fp = open(path, 'rb') msg = MIMEBase(maintype, subtype) msg.set_payload(fp.read()) fp.close() # Encode the payload using Base64 encoders.encode_base64(msg) # Set the filename parameter msg.add_header('Content-Disposition', 'attachment', filename=filename) outer.attach(msg) # Now send or store the message composed = outer.as_string() if opts.output: fp = open(opts.output, 'w') fp.write(composed) fp.close() else: s = smtplib.SMTP('localhost') s.sendmail(opts.sender, opts.recipients, composed) s.quit() if __name__ == '__main__': main()
To send mail containing Pictures | Python Example
Here is a Python Program to send a MIME message containing a bunch of family pictures that may be residing in a directory
# Import smtplib for the actual sending function import smtplib # Here are the email package modules we'll need from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart COMMASPACE = ', ' # Create the container (outer) email message. msg = MIMEMultipart() msg['Subject'] = 'Our family reunion' # me == the sender's email address # family = the list of all recipients' email addresses msg['From'] = me msg['To'] = COMMASPACE.join(family) msg.preamble = 'Our family reunion' # Assume we know that the image files are all in PNG format for file in pngfiles: # Open the files in binary mode. Let the MIMEImage class automatically # guess the specific image type. fp = open(file, 'rb') img = MIMEImage(fp.read()) fp.close() msg.attach(img) # Send the email via our own SMTP server. s = smtplib.SMTP('localhost') s.sendmail(me, family, msg.as_string()) s.quit()
How to create and send a simple text message
Here is a Python Program to create and send a simple text message:
# Import smtplib for the actual sending function import smtplib # Import the email modules we'll need from email.mime.text import MIMEText # Open a plain text file for reading. For this example, assume that # the text file contains only ASCII characters. fp = open(textfile, 'rb') # Create a text/plain message msg = MIMEText(fp.read()) fp.close() # me == the sender's email address # you == the recipient's email address msg['Subject'] = 'The contents of %s' % textfile msg['From'] = me msg['To'] = you # Send the message via our own SMTP server, but don't include the # envelope header. s = smtplib.SMTP('localhost') s.sendmail(me, [you], msg.as_string()) s.quit()
Close a file after using | Python Example
import sys def open_file(file_name, mode): """Open a file.""" try: the_file = open(file_name, mode) except(IOError), e: print "Unable to open the file", file_name, "Ending program.\n", e raw_input("\n\nPress the enter key to exit.") sys.exit() else: return the_file def next_line(the_file): """Return next line from the trivia file, formatted.""" line = the_file.readline() line = line.replace("/", "\n") return line trivia_file = open_file("trivia.txt", "r") title = next_line(trivia_file) trivia_file.close()
Python Program to Close the File
Here is a Python Program to Close the File
# Open files consume system resources, and depending on the file mode, # other programs may not be able to access them. # It's important to close files as soon as you're finished with them, as shown in # f = open("/music/_singles/kairo.mp3", "rb") print f.closed f.close() print f print f.closed # The closed attribute of a file object indicates whether the object has # a file open or not. In this case, the file is still open (closed is False).
Python Program to Reading Binary Files
f = open("/music/_singles/kairo.mp3", "rb") print f print f.tell() f.seek(-128, 2) print f.tell() tagData = f.read(128) print tagData print f.tell()
Python Program to Working with Directories
# The os.path module has several functions for manipulating files and directories. # Here, we'll look at handling pathnames and listing the contents of a directory. import os print os.path.join("c:\\yourfolder\\", "yourfile.txt") print os.path.join("c:\\yourfolder", "yourfile.txt") os.path.expanduser("~") print os.path.join(os.path.expanduser("~"), "Python") # os.path is a reference to a module-which module depends on your platform. Just # as getpass encapsulates differences between platforms by setting getpass to a # platform-specific function, os encapsulates differences between platforms by # setting path to a platform-specific module.
Python Program to Change Directory
Here is a Python Program to Change Directory
import os os.chdir('/server/accesslogs') #Be sure to use the "import os" style instead of "from os import *". This will keep #os.open() from shadowing the builtin open() function which operates much #differently.
Listing Directories with glob | Python Examples
import os os.listdir("c:\\") import glob glob.glob('c:\\*.mp3') glob.glob('c:\\*.mp3') glob.glob('c:\\c\\*\\*.mp3')
Python Program to Listing Directories
import os os.listdir("c:\\") dirname = "c:\\" os.listdir(dirname) print [f for f in os.listdir(dirname) if os.path.isfile(os.path.join(dirname, f))] [3] print [f for f in os.listdir(dirname) if os.path.isdir(os.path.join(dirname, f))] [4]
Mail Alert using SMTP in python
import smtplib #to send the email to concerned persons def Send_Mail(TEXT): sender = 'sender_mail_address in single quotes ' receivers = ['firstperson mailaddress in single quotes ','secondperson mail address in single quotes '] # Prepare actual message message = "Subject:ALERT!\n" message=message+TEXT try: smtpObj = smtplib.SMTP(' SMTP MAIL SERVER in single quotes ',25) smtpObj.sendmail(sender, receivers, message) smtpObj.close() except smtplib.SMTPConnectError: print "Error: unable to send email by connect" except smtplib.SMTPException: print "Error: unable to send email" #main Send_Mail("enter you text here..")
Inserting into MySQL Database Using Python Program
import MySQLdb db= MySQLdb.connect(host="localhost", user="python-test", passwd="python", db="python-test") try: title = raw_input("Please enter a book title: ") if title == "" : exit author = raw_input("Please enter the author's name: ") pubdate = int( raw_input("Enter the publication year: ") ) except: print "Invalid value" exit print "Title: [" + title + "]" print "Author: ["+ author + "]" print "Publication Date: " + str(pubdate) cursor = db.cursor() stmt = "INSERT INTO Books (BookName, BookAuthor, PublicationDate) VALUES ('" stmt = stmt + title stmt = stmt + "', '" stmt = stmt + author stmt = stmt + "', " stmt = stmt + str(pubdate) stmt = stmt + ")" cursor.execute(stmt) print "Record added!" cursor.close () db.commit ()
Updating a MySQL Database with Python Language | Python Example
import MySQLdb import MySQLdb.cursors def get_column_name( data, prompt, names ) : value=-1 while value == -1: idx = 1 for col in data : print str(idx) + ": " + col names.append( col ) idx = idx + 1 value = int( raw_input(prompt) ) if value < 1 or value >= idx : value = -1 return value conn = MySQLdb.Connect( host='localhost', user='python-test', passwd='python', db='python-test') cursor = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) cursor.execute("SELECT * FROM books") data = cursor.fetchone() names = [] old_value = get_column_name( data, "Which column do you want to change records for? ", names ) names = [] new_value = get_column_name( data, "Which column do you want to change records to? ", names ) old_val = raw_input("What value do you want to change for " + names[old_value-1] + ": ") new_val = raw_input("What value do you want to change to for " + names[new_value-1] + ": ") stmt = "UPDATE books SET " + names[new_value-1] + " = '"+ new_val + "' WHERE " + names[old_value-1] + " = '" + old_val + "'" print stmt cursor.execute(stmt) print "Rows affected: " + str(cursor.rowcount) cursor.close() conn.commit() conn.close()
Reading Data from MySQL Database | Python Example
import MySQLdb db= MySQLdb.connect(host="localhost", user="python-test", passwd="python", db="python-test") cursor = db.cursor() stmt = "SELECT * from books" cursor.execute(stmt) rows = cursor.fetchall () for row in rows: print "Row: " for col in row : print "Column: %s" % (col) print "End of Row" print "Number of rows returned: %d" % cursor.rowcount cursor.close() db.close()
LED Control Using Python Program
#/* #Python and Tkinter Programming #John E. Grayson #ISBN: 1884777813 #Publisher: Manning #*/ from Tkinter import * SQUARE = 1 ROUND = 2 ARROW = 3 POINT_DOWN = 0 POINT_UP = 1 POINT_RIGHT = 2 POINT_LEFT = 3 STATUS_OFF = 1 STATUS_ON = 2 STATUS_WARN = 3 STATUS_ALARM = 4 STATUS_SET = 5 class DummyClass: pass Color = DummyClass() Color.PANEL = '#545454' Color.OFF = '#656565' Color.ON = '#00FF33' Color.WARN = '#ffcc00' Color.ALARM = '#ff4422' class LED: def __init__(self, master=None, width=25, height=25, appearance=FLAT, status=STATUS_ON, bd=1, bg=None, shape=SQUARE, outline="", blink=0, blinkrate=1, orient=POINT_UP, takefocus=0): # preserve attributes self.master = master self.shape = shape self.onColor = Color.ON self.offColor = Color.OFF self.alarmColor = Color.ALARM self.warningColor = Color.WARN self.specialColor = '#00ffdd' self.status = status self.blink = blink self.blinkrate = int(blinkrate) self.on = 0 self.onState = None if not bg: bg = Color.PANEL ## Base frame to contain light self.frame=Frame(master, relief=appearance, bg=bg, bd=bd, takefocus=takefocus) basesize = width d = center = int(basesize/2) if self.shape == SQUARE: self.canvas=Canvas(self.frame, height=height, width=width, bg=bg, bd=0, highlightthickness=0) self.light=self.canvas.create_rectangle(0, 0, width, height, fill=Color.ON) elif self.shape == ROUND: r = int((basesize-2)/2) self.canvas=Canvas(self.frame, width=width, height=width, highlightthickness=0, bg=bg, bd=0) if bd > 0: self.border=self.canvas.create_oval(center-r, center-r, center+r, center+r) r = r - bd self.light=self.canvas.create_oval(center-r-1, center-r-1, center+r, center+r, fill=Color.ON, outline=outline) else: # Default is an ARROW self.canvas=Canvas(self.frame, width=width, height=width, highlightthickness=0, bg=bg, bd=0) x = d y = d if orient == POINT_DOWN: self.light=self.canvas.create_polygon(x-d,y-d, x,y+d, x+d,y-d, x-d,y-d, outline=outline) elif orient == POINT_UP: self.light=self.canvas.create_polygon(x,y-d, x-d,y+d, x+d,y+d, x,y-d, outline=outline) elif orient == POINT_RIGHT: self.light=self.canvas.create_polygon(x-d,y-d, x+d,y, x-d,y+d, x-d,y-d, outline=outline) elif orient == POINT_LEFT: self.light=self.canvas.create_polygon(x-d,y, x+d,y+d, x+d,y-d, x-d,y, outline=outline) self.canvas.pack(side=TOP, fill=X, expand=NO) self.update() def turnon(self): self.status = STATUS_ON if not self.blink: self.update() def turnoff(self): self.status = STATUS_OFF if not self.blink: self.update() def alarm(self): self.status = STATUS_ALARM if not self.blink: self.update() def warn(self): self.status = STATUS_WARN if not self.blink: self.update() def set(self, color): self.status = STATUS_SET self.specialColor = color self.update() def blinkon(self): if not self.blink: self.blink = 1 self.onState = self.status self.update() def blinkoff(self): if self.blink: self.blink = 0 self.status = self.onState self.onState = None self.on = 0 self.update() def blinkstate(self, blinkstate): if blinkstate: self.blinkon() else: self.blinkoff() def update(self): # First do the blink, if set to blink if self.blink: if self.on: if not self.onState: self.onState = self.status self.status = STATUS_OFF self.on = 0 else: if self.onState: self.status = self.onState # Current ON color self.on = 1 if self.status == STATUS_ON: self.canvas.itemconfig(self.light, fill=self.onColor) elif self.status == STATUS_OFF: self.canvas.itemconfig(self.light, fill=self.offColor) elif self.status == STATUS_WARN: self.canvas.itemconfig(self.light, fill=self.warningColor) elif self.status == STATUS_SET: self.canvas.itemconfig(self.light, fill=self.specialColor) else: self.canvas.itemconfig(self.light, fill=self.alarmColor) self.canvas.update_idletasks() if self.blink: self.frame.after(self.blinkrate * 1000, self.update) if __name__ == '__main__': class TestLEDs(Frame): def __init__(self, parent=None): # List of Colors and Blink On/Off states = [(STATUS_OFF, 0), (STATUS_ON, 0), (STATUS_WARN, 0), (STATUS_ALARM, 0), (STATUS_SET, 0), (STATUS_ON, 1), (STATUS_WARN, 1), (STATUS_ALARM, 1), (STATUS_SET, 1)] # List of LED types to display, # with sizes and other attributes leds = [(ROUND, 25, 25, FLAT, 0, None, ""), (ROUND, 15, 15, RAISED, 1, None, ""), (SQUARE, 20, 20, SUNKEN, 1, None, ""), (SQUARE, 8, 8, FLAT, 0, None, ""), (SQUARE, 8, 8, RAISED, 1, None, ""), (SQUARE, 16, 8, FLAT, 1, None, ""), (ARROW, 14, 14, RIDGE, 1, POINT_UP, ""), (ARROW, 14, 14, RIDGE, 0, POINT_RIGHT, ""), (ARROW, 14, 14, FLAT, 0, POINT_DOWN, "white")] Frame.__init__(self) # Do superclass init self.pack() self.master.title('LED Example - Stage 1') # Iterate for each type of led for shape, w, h, app, bd, orient, outline in leds: frame = Frame(self, bg=Color.PANEL) frame.pack(anchor=N, expand=YES, fill=X) # Iterate for selected states for state, blink in states: LED(frame, shape=shape, status=state, width=w, height=h, appearance=app, orient=orient, blink=blink, bd=bd, outline=outline).frame.pack(side=LEFT, expand=YES, padx=1, pady=1) TestLEDs().mainloop()
Make a Simple Calculator using Python Language | Python Example
from Tkinter import * import Pmw, string class SLabel(Frame): """ SLabel defines a 2-sided label within a Frame. The left hand label has blue letters the right has white letters """ def __init__(self, master, leftl, rightl): Frame.__init__(self, master, bg='gray40') self.pack(side=LEFT, expand=YES, fill=BOTH) Label(self, text=leftl, fg='steelblue1', font=("arial", 6, "bold"), width=5, bg='gray40').pack( side=LEFT, expand=YES, fill=BOTH) Label(self, text=rightl, fg='white', font=("arial", 6, "bold"), width=1, bg='gray40').pack( side=RIGHT, expand=YES, fill=BOTH) class Key(Button): def __init__(self, master, font=('arial', 8, 'bold'), fg='white',width=5, borderwidth=5, **kw): kw['font'] = font kw['fg'] = fg kw['width'] = width kw['borderwidth'] = borderwidth apply(Button.__init__, (self, master), kw) self.pack(side=LEFT, expand=NO, fill=NONE) class Calculator(Frame): def __init__(self, parent=None): Frame.__init__(self, bg='gray40') self.pack(expand=YES, fill=BOTH) self.master.title('Tkinter Toolkit TT-42') self.master.iconname('Tk-42') self.calc = Evaluator() # This is our evaluator self.buildCalculator() # Build the widgets # This is an incomplete dictionary - a good exercise! self.actionDict = {'second': self.doThis, 'mode': self.doThis, 'delete': self.doThis, 'alpha': self.doThis, 'stat': self.doThis, 'math': self.doThis, 'matrix': self.doThis, 'program': self.doThis, 'vars': self.doThis, 'clear': self.clearall, 'sin': self.doThis, 'cos': self.doThis, 'tan': self.doThis, 'up': self.doThis, 'X1': self.doThis, 'X2': self.doThis, 'log': self.doThis, 'ln': self.doThis, 'store': self.doThis, 'off': self.turnoff, 'neg': self.doThis, 'enter': self.doEnter, } self.current = "" def doThis(self,action): print '"%s" has not been implemented' % action def turnoff(self, *args): self.quit() def clearall(self, *args): self.current = "" self.display.component('text').delete(1.0, END) def doEnter(self, *args): result = self.calc.runpython(self.current) if result: self.display.insert(END, '\n') self.display.insert(END, '%s\n' % result, 'ans') self.current = "" def doKeypress(self, event): key = event.char if not key in ['\b']: self.current = self.current + event.char if key == '\b': self.current = self.current[:-1] def keyAction(self, key): self.display.insert(END, key) self.current = self.current + key def evalAction(self, action): try: self.actionDict[action](action) except KeyError: pass def buildCalculator(self): FUN = 1 # Designates a Function KEY = 0 # Designates a Key KC1 = 'gray30' # Dark Keys KC2 = 'gray50' # Light Keys KC3 = 'steelblue1' # Light Blue Key KC4 = 'steelblue' # Dark Blue Key keys = [ [('2nd', '', '', KC3, FUN, 'second'), # Row 1 ('Mode', 'Quit', '', KC1, FUN, 'mode'), ('Del', 'Ins', '', KC1, FUN, 'delete'), ('Alpha','Lock', '', KC2, FUN, 'alpha'), ('Stat', 'List', '', KC1, FUN, 'stat')], [('Math', 'Test', 'A', KC1, FUN, 'math'), # Row 2 ('Mtrx', 'Angle','B', KC1, FUN, 'matrix'), ('Prgm', 'Draw', 'C', KC1, FUN, 'program'), ('Vars', 'YVars','', KC1, FUN, 'vars'), ('Clr', '', '', KC1, FUN, 'clear')], [('X-1', 'Abs', 'D', KC1, FUN, 'X1'), # Row 3 ('Sin', 'Sin-1','E', KC1, FUN, 'sin'), ('Cos', 'Cos-1','F', KC1, FUN, 'cos'), ('Tan', 'Tan-1','G', KC1, FUN, 'tan'), ('^', 'PI', 'H', KC1, FUN, 'up')], [('X2', 'Root', 'I', KC1, FUN, 'X2'), # Row 4 (',', 'EE', 'J', KC1, KEY, ','), ('(', '{', 'K', KC1, KEY, '('), (')', '}', 'L', KC1, KEY, ')'), ('/', '', 'M', KC4, KEY, '/')], [('Log', '10x', 'N', KC1, FUN, 'log'), # Row 5 ('7', 'Un-1', 'O', KC2, KEY, '7'), ('8', 'Vn-1', 'P', KC2, KEY, '8'), ('9', 'n', 'Q', KC2, KEY, '9'), ('X', '[', 'R', KC4, KEY, '*')], [('Ln', 'ex', 'S', KC1, FUN, 'ln'), # Row 6 ('4', 'L4', 'T', KC2, KEY, '4'), ('5', 'L5', 'U', KC2, KEY, '5'), ('6', 'L6', 'V', KC2, KEY, '6'), ('-', ']', 'W', KC4, KEY, '-')], [('STO', 'RCL', 'X', KC1, FUN, 'store'), # Row 7 ('1', 'L1', 'Y', KC2, KEY, '1'), ('2', 'L2', 'Z', KC2, KEY, '2'), ('3', 'L3', '', KC2, KEY, '3'), ('+', 'MEM', '"', KC4, KEY, '+')], [('Off', '', '', KC1, FUN, 'off'), # Row 8 ('0', '', '', KC2, KEY, '0'), ('.', ':', '', KC2, KEY, '.'), ('(-)', 'ANS', '?', KC2, FUN, 'neg'), ('Enter','Entry','', KC4, FUN, 'enter')]] self.display = Pmw.ScrolledText(self, hscrollmode='dynamic', vscrollmode='dynamic', hull_relief='sunken', hull_background='gray40', hull_borderwidth=10, text_background='honeydew4', text_width=16, text_foreground='black', text_height=6, text_padx=10, text_pady=10, text_relief='groove', text_font=('arial', 12, 'bold')) self.display.pack(side=TOP, expand=YES, fill=BOTH) self.display.tag_config('ans', foreground='white') self.display.component('text').bind('', self.doKeypress) self.display.component('text').bind(' ', self.doEnter) for row in keys: rowa = Frame(self, bg='gray40') rowb = Frame(self, bg='gray40') for p1, p2, p3, color, ktype, func in row: if ktype == FUN: a = lambda s=self, a=func: s.evalAction(a) else: a = lambda s=self, k=func: s.keyAction(k) SLabel(rowa, p2, p3) Key(rowb, text=p1, bg=color, command=a) rowa.pack(side=TOP, expand=YES, fill=BOTH) rowb.pack(side=TOP, expand=YES, fill=BOTH) class Evaluator: def __init__(self): self.myNameSpace = {} self.runpython("from math import *") def runpython(self, code): try: return 'eval(code, self.myNameSpace, self.myNameSpace)' except SyntaxError: try: exec code in self.myNameSpace, self.myNameSpace except: return 'Error' Calculator().mainloop()