import os
import re
#import sys
#shutil
def matchline(line,match):
#cmatch=re.compile(match)
cmatch=match
checkmatch=cmatch.search(line)
if checkmatch!=None:
print "-----------------------------------------"
print "Got the line...."+line
return 0
else:
return -1
def readfilestring(filename, match):
linestring = open(filename, 'r')
l=linestring.readline()
while l != '':
#print l
if matchline(l,match) ==0:
print "<----in file" + filename
print "-----------------------------------------"
l=linestring.readline()
def search_file(search_path,filename,stringpatt):
try:
dirList = os.listdir(search_path)
except Exception, e:
print "Search error for OS issue - "
print e
else:
for d in dirList:
if os.path.isdir(search_path+os.sep+d) == True:
search_file(search_path+os.sep+d,filename,stringpatt)
elif os.path.isfile(search_path+os.sep+d) == True:
#print d + " is a file update whatever required"
pmatch=filename.search(d)
if pmatch==None:
ss=1
else:
#print "-----------------------------------------"
#print "The file is found - "+search_path+os.sep+d
#print "-----------------------------------------"
readfilestring(search_path+os.sep+d, stringpatt)
else:
print "Unknown filesystem object - "+search_path+os.sep+d
if __name__ == '__main__':
search_path = raw_input("Enter search_path: ")
filepattern = raw_input("Enter filepattern: ")
stringpattern= raw_input("Enter stringpattern: ")
try:
retest="filepattern"
ire=re.compile(filepattern,re.I)
retest="stringpattern"
strre=re.compile(stringpattern,re.I)
except Exception, e:
print "Reg Ex problem - for "+retest
print e
else:
if os.path.exists(search_path):
find_file = search_file(search_path,ire,strre)
else:
print "Not a valid path - "+search_path
Find text/pattern under parent folder from matched file names
XML to CSV Converter
def xml2csv(inputFile,outputFile,elemName,ns,columnsToExtract):
outFile=open(outputFile,'w')
writer=csv.DictWriter(outFile,columnsToExtract,extrasaction='ignore',restval='')
# Write Header to CSV file
writer.writerow(columnsToExtract)
#Sree -Used iterparse, so that only partial xml is in memory and after usage every element is cleared out of memory.
for event,rec in etree.iterparse(inputFile, tag="%s%s" %(ns,elemName)):
row=dict()
for child in rec:
row[child.tag[len(ns):]]=child.text.strip()
rec.clear()
writer.writerow(row)
outFile.close()
XSL to CSV converter
def xls2csv(inputFile,outputFile):
print "Input File : %s" %(inputFile)
print "Output File : %s" %(outputFile)
writer=csv.writer(open(outputFile,'w'),delimiter=',',quotechar='"')
book=xlrd.open_workbook(inputFile)
sheet=book.sheet_by_index(0)
for row in range(sheet.nrows):
writer.writerow( sheet.row_values(row))
Spell Corrector in Python
import re, collections
def words(text): return re.findall('[a-z]+', text.lower())
def train(features):
model = collections.defaultdict(lambda: 1)
for f in features:
model[f] += 1
return model
NWORDS = train(words(file('big.txt').read()))
alphabet = 'abcdefghijklmnopqrstuvwxyz'
def edits1(word):
splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]
deletes = [a + b[1:] for a, b in splits if b]
transposes = [a + b[1] + b[0] + b[2:] for a, b in splits if len(b)>1]
replaces = [a + c + b[1:] for a, b in splits for c in alphabet if b]
inserts = [a + c + b for a, b in splits for c in alphabet]
return set(deletes + transposes + replaces + inserts)
def known_edits2(word):
return set(e2 for e1 in edits1(word) for e2 in edits1(e1) if e2 in NWORDS)
def known(words): return set(w for w in words if w in NWORDS)
def correct(word):
candidates = known([word]) or known(edits1(word)) or known_edits2(word) or [word]
return max(candidates, key=NWORDS.get)
Format a plain ASCII file using Python script
#!/bin/env python
# Paginate a text file, adding a header and footer
import sys, time, string
# If no arguments were given, print a helpful message
if len(sys.argv)!=2:
print 'Usage: pyprint filename'
sys.exit(0)
class PrinterFormatter:
def __init__(self, filename, page_len=58):
# Save the time of creation for inclusion in the header
import time
self.now=time.asctime(time.localtime(time.time()))
# Save the filename and page length
self.filename=filename ; self.page_len = page_len
# Zero all the counters
self.page=0 ; self.count=0 ; self.header_written=0
def write_header(self):
# If the header for this page has just been written, don't
# write another one.
if self.header_written: return
# Increment the page count, and reset the line count
self.header_written=1 ; self.count=1 ; self.page=self.page+1
# Write the header
header=self.filename
p=str(self.page) ; header=string.ljust(header, 38-len(p))+'['+p+']'
header=string.ljust(header, 79-len(self.now))+self.now
sys.stdout.write(header+'\n\n')
def writeline(self, L):
# If the line is exactly 80 lines long, chop off any trailing
# newline, since the printhead will wrap around
length=len(L)
if (length % 80) == 0 and L and L[-1]=='\n': L=L[:-1]
# If we've printed a pageful of lines, output a linefeed and
# output the header.
if self.count>self.page_len:
sys.stdout.write('')
self.write_header()
# Print the actual line of text
sys.stdout.write(L)
self.count=self.count+1 ; self.header_written=0
# Open the input file, and create a PrinterFormatter object, passing
# it the filename to put in the page header.
f=open(sys.argv[1], 'r')
o=PrinterFormatter(sys.argv[1])
o.write_header() # Print the header on the first page
# Iterate over all the lines in the file; the writeline() method will
# output them and automatically add page breaks and headers where
# required.
while (1):
L=f.readline()
if L=="": break
o.writeline(L)
# Write a final page break and close the input file.
sys.stdout.write('')
f.close()
Control Structures in Python
If
====
# basic "If"
if boolean:
x = 1
else:
x = 0
# Not so basic "If"
# Notice the new keywords!
if (boolean and otherboolean) or ( not boolean ): #oops, this is just if(boolean), oh well... :~)
x = 1
elif not otherboolean:
x = 0
else:
x = 100000000000000000000 # Big numbers make if statements less boring!
While
=====
# This is it...
while i < 100:
i + 1
# More generally:
while condition == true:
"""do the following"""
...
...
condition = false.
# Infinite Loop:
while 1:
print "I'm loopy!"
FOR
====
# Basic For Loop
for item in list:
dosomething(item)
# The traditional C for loop has a different form in Python:
for x in range(0, 100):
dosomething(x)
# More interesting example:
def buglove( UIUC ):
for bug in UIUC:
if type( bug ) == type( chinese_lady_beetle() ):
crush( bug )
else:
hug( bug )
Python script to zip and unzip files
# Simple Application/Script to Compress a File or Directory
# Essentially you could use this instead of Winzip
"""
Path can be a file or directory
Archname is the name of the to be created archive
"""
from zipfile import ZipFile, ZIP_DEFLATED
import os # File stuff
import sys # Command line parsing
def zippy(path, archive):
paths = os.listdir(path)
for p in paths:
p = os.path.join(path, p) # Make the path relative
if os.path.isdir(p): # Recursive case
zippy(p, archive)
else:
archive.write(p) # Write the file to the zipfile
return
def zipit(path, archname):
# Create a ZipFile Object primed to write
archive = ZipFile(archname, "w", ZIP_DEFLATED) # "a" to append, "r" to read
# Recurse or not, depending on what path is
if os.path.isdir(path):
zippy(path, archive)
else:
archive.write(path)
archive.close()
return "Compression of \""+path+"\" was successful!"
instructions = "zipit.py: Simple zipfile creation script." + \
"recursively zips files in a directory into" + \
"a single archive." +\
"e.g.: python zipit.py myfiles myfiles.zip"
# Notice the __name__=="__main__"
# this is used to control what Python does when it is called from the
# command line. I'm sure you've seen this in some of my other examples.
if __name__=="__main__":
if len(sys.argv) >= 3:
result = zipit(sys.argv[1], sys.argv[2])
print result
else:
print instructions
================================================================================
# Simple script to Unzip archives created by
# our Zip Scripts.
import sys
import os
from zipfile import ZipFile, ZIP_DEFLATED
def unzip( path ):
# Create a ZipFile Object Instance
archive = ZipFile(path, "r", ZIP_DEFLATED)
names = archive.namelist()
for name in names:
if not os.path.exists(os.path.dirname(name)):
# Create that directory
os.mkdir(os.path.dirname(name))
# Write files to disk
temp = open(name, "wb") # create the file
data = archive.read(name) #read the binary data
temp.write(data)
temp.close()
archive.close()
return "\""+path+"\" was unzipped successfully."
instructions = "This script unzips plain jane zipfiles:"+\
"e.g.: python unzipit.py myfiles.zip"
if __name__=="__main__":
if len(sys.argv) == 2:
msg = unzip(sys.argv[1])
print msg
else:
print instructions
Fast and efficient Backup script that works on Windows and Linux
import os, os.path
import subprocess
import time
def backUpDir(path):
"""
Creates backup of the passed old dir and creates a new dir. The backed up dir gets the
date and time as the name of the new backed up file.
On success, returns a list consising of two values:
0: to signify the success
None: means no error occurred.
On error, return a list consisting of two values:
-1 : to signify the failure
error string: the exact error string
"""
if os.path.exists(path) == True:
#dir exists then backup old dir and create new
backupDir = path + time.strftime('-%Y-%m-%d-%Hh%Mm%Ss')
if os.name == "nt":
#NT Sysyem - used the DOS Command 'move' to rename the folder
cmd = subprocess.Popen(["mov", path, backupDir], \
shell = True, \
stdout = subprocess.PIPE, \
stdin = subprocess.PIPE, \
stderr = subprocess.PIPE)
elif os.name == "posix":
#POSIX System - use the appropriate POSIX command to rename the folder.
cmd=subprocess.Popen(["mv", path, backupDir], \
shell = True, \
stdout = subprocess.PIPE, \
stdin = subprocess.PIPE, \
stderr = subprocess.PIPE)
pass
else:
# Not supported on other platforms
return [-1, "Not supported on %s platform" %(os.name)]
(out, err) = cmd.communicate()
if len(err) != 0:
return [-1, err]
else:
os.mkdir(path)
return [0, None]
else:
#create new dir
os.mkdir(path)
return [0, None]
File Splitter Script
#!/usr/bin/env python
# encoding: utf-8
import sys
from math import *
import os
import random
from Tkinter import *
import tkFileDialog
def linesplit():
line=0
k1=tkFileDialog.askopenfilename()
ff=open(k1)
for lineee in ff:
line=line+1
print "total number of lines is:"+str(line)
ff.close()
line=0
user_ask=raw_input("Specify the number of records in each file = ")
user_ask=eval(user_ask)
print user_ask
ff=open(k1)
i=1
temp=user_ask
for lineee in ff:
line=line+1
temp_filename="c:\Temp\split_"+str(i)+".txt"
if(line < user_ask):
ff1=open(temp_filename,'a+')
ff1.write(lineee)
ff1.close()
if(line == user_ask):
user_ask=user_ask+temp
i=i+1
ff.close()
flag=1
return (line,flag)
k=linesplit()
flag=k[1]
Python script for walking the directory tree structure but excluding some directories and files
from os.path import join, getsize
for root, dirs, files in walk('python/Lib/email'):
print root, "consumes",
print sum([getsize(join(root, name)) for name in files]),
print "bytes in", len(files), "non-directory files"
if 'CVS' in dirs:
dirs.remove('CVS') # don't visit CVS directories
"""
from os.path import join, isdir, islink
from os import error, listdir
# We may not have read permission for top, in which case we can't
# get a list of the files the directory contains. os.path.walk
# always suppressed the exception then, rather than blow up for a
# minor reason when (say) a thousand readable directories are still
# left to visit. That logic is copied here.
try:
# Note that listdir and error are globals in this module due
# to earlier import-*.
names = listdir(top)
except error, err:
if onerror is not None:
onerror(err)
return
if exclude_list != []:
from copy import deepcopy
temp_name = deepcopy(names)
names = [item for item in temp_name if item not in exclude_list]
dirs, nondirs = [], []
for name in names:
if isdir(join(top, name)):
dirs.append(name)
else:
nondirs.append(name)
if topdown:
yield top, dirs, nondirs
for name in dirs:
path = join(top, name)
if not islink(path):
for x in walkExclusive(path, topdown, onerror, exclude_list):
yield x
if not topdown:
yield top, dirs, nondirs
Python XML Parser
import xml.dom.minidom
class XMLParser:
def __init__(self, filePath):
self.xml_file_path = filePath
def get_a_document(self):
return xml.dom.minidom.parse(self.xml_file_path)
def process_xml_return_dict(self):
doc = self.get_a_document()
fieldMapping = doc.childNodes[0]
operations = {}
for layer in fieldMapping.childNodes:
if layer.nodeType == xml.dom.minidom.Node.TEXT_NODE or layer.nodeType == xml.dom.minidom.Node.COMMENT_NODE:
continue
fieldList = self.get_fields(layer)
attrList = self.get_attributes(layer)
operations[layer.nodeName] = (attrList, fieldList)
return operations
def get_attributes(self, node):
attrList = []
nodeMap = node.attributes
for index in range(nodeMap.length):
attrName = nodeMap.item(index).name
attrValue = node.getAttribute(attrName)
attrList.append((attrName, attrValue))
return attrList
def get_fields(self, node):
fieldList = []
for childNode in node.childNodes:
if childNode.nodeType != xml.dom.minidom.Node.TEXT_NODE and childNode.nodeType != xml.dom.minidom.Node.COMMENT_NODE:
fromField = childNode.getAttribute('FromField')
toField = childNode.getAttribute('ToField')
fieldValue = childNode.getAttribute('Value')
condition = childNode.getAttribute('Condition')
fieldList.append((fromField, toField, fieldValue, condition))
return fieldList
Python pickle module & Zip file creation
Description
The solution gives a solution to use the pickle module and to create a Zip File.
# below is a typical Python dictionary object of roman numerals
romanD1 = {'I':1,'II':2,'III':3,'IV':4,'V':5,'VI':6,'VII':7,'VIII':8,'IX':9,'X':10}
# to save a Python object like a dictionary to a file
# and load it back intact you have to use the pickle module
import pickle
print "The original dictionary:"
print romanD1
file = open("roman1.dat", "w")
pickle.dump(romanD1, file)
file.close()
# now load the dictionay object back from the file ...
file = open("roman1.dat", "r")
romanD2 = pickle.load(file)
file.close()
print "Dictionary after pickle.dump() and pickle.load():"
print romanD2
# for large text files you can write and read a zipped file (PKZIP format)
# notice that the syntax is mildly different from normal file read/write
import zipfile
zfilename = "English101.zip"
zout = zipfile.ZipFile(zfilename, "w")
zout.writestr(zfilename, str1 + str2)
zout.close()
# read the zipped file back in
zin = zipfile.ZipFile(zfilename, "r")
strz = zin.read(zfilename)
zin.close()
print "Testing the contents of %s:" % zfilename
print strz
IP Address Generation
class IP
def initialize(ip)
@ip = ip
end
def to_s
@ip
end
def==(other)
to_s==other.to_s
end
def succ
return @ip if @ip == "255.255.255.255"
parts = @ip.split('.').reverse
parts.each_with_index do |part,i|
if part.to_i < 255
part.succ!
break
elsif part == "255"
part.replace("0") unless i == 3
else
raise ArgumentError, "Invalid number #{part} in IP address"
end
end
parts.reverse.join('.')
end
def succ!
@ip.replace(succ)
end
end
print "Input Starting IP Address: "
start_ip = gets.strip
print "Input Ending IP Address: "
end_ip = gets.strip
i = IP.new(start_ip)
ofile = File.open("ips.txt", "w")
ofile.puts i.succ! until i == end_ip
ofile.close
Orphan Checker
links = Array.new
orphans = Array.new
dir_array = [Dir.getwd]
unless File.readable?("links.txt")
puts "File is not readable."
exit
end
File.open('links.txt', 'rb') do |lv|
lv.each_line do |line|
links << line.chomp
end
end
begin
p = dir_array.shift
Dir.chdir(p)
Dir.foreach(p) do |filename|
next if filename == '.' or filename == '..'
if !File::directory?(filename)
orphans << p + File::SEPARATOR + filename
else
dir_array << p + File::SEPARATOR + filename
end
end
end while !dir_array.empty?
orphans -= links
File.open("orphans.txt", "wb") do |o|
o.puts orphans
end
HTML Email using Python
def createhtmlmail( html, text, subject ):
"""Create a mime-message that will render HTML in popular
MUAs, text in better ones"""
import MimeWriter
import mimetools
import cStringIO
out = cStringIO.StringIO() # output buffer for our message
htmlin = cStringIO.StringIO(html)
writer = MimeWriter.MimeWriter(out)
#
# set up some basic headers... we put subject here
# because smtplib.sendmail expects it to be in the
# message body
#
writer.addheader("Subject", subject)
writer.addheader("MIME-Version", "1.0")
#
# start the multipart section of the message
# multipart/alternative seems to work better
# on some MUAs than multipart/mixed
#
writer.startmultipartbody("alternative")
writer.flushheaders()
#
# the plain text section
#
if text != None:
txtin = cStringIO.StringIO(text)
subpart = writer.nextpart()
subpart.addheader("Content-Transfer-Encoding", "quoted-printable")
pout = subpart.startbody("text/plain", [("charset", 'us-ascii')])
mimetools.encode(txtin, pout, 'quoted-printable')
txtin.close()
#
# start the html subpart of the message
#
subpart = writer.nextpart()
subpart.addheader("Content-Transfer-Encoding", "quoted-printable")
#
# returns us a file-ish object we can write to
#
pout = subpart.startbody("text/html", [("charset", 'us-ascii')])
mimetools.encode(htmlin, pout, 'quoted-printable')
htmlin.close()
#
# Now that we're done, close our writer and
# return the message body
#
writer.lastpart()
msg = out.getvalue()
out.close()
#print msg
return msg
def sendmail (sender,to,subject,htmlFilename):
import smtplib
f = open(htmlFilename, 'r')
html = f.read()
f.close()
message = createhtmlmail(html, None, subject)
server = smtplib.SMTP("mta-hub")
server.sendmail(sender, to, message)
server.quit()
Python interface to load commands.
This is just a code snippet elaborating the idea. Adding features like exceptions, Fancier formatting , Error redirection etc is left to you !
""" This Function loads a command file into memory as a dictonary , With Commands as keys"""
def load_command_file(file_name):
command_dict={}
if file_name.isalpha():
print("Error!!command file name should be in ascii\n");
return command_dict
file=open(file_name,'r')
file_lines=file.readlines()
for line in file_lines:
command_list=line.strip().split("=")
command_dict[command_list[0].strip()]=command_list[1].strip()
file.close()
return command_dict
########################################################################################################
""" This function uses the above mentioned methods to create commands based on passed args"""
def create_command(*args):
command_dict=load_command_file(r"C:\Documents and Settings\Raj\Desktop\command_file.txt")
#print(command_dict)
arg_len=len(args)
command_str=command_dict[args[arg_len-1]]
if command_str.count('%s')!=arg_len-1:
print("Error mismatch in args passed and args actually needed")
sys.exit()
command=command_str%(args[:-1])
return command
Python script to Validate Tabular format Data and database restore
Our python utility does all these functionality.
Major functionality:
- Process a batch of large cvs/text files
- Can iterate in sub-folders
- File can be filtered on criteria expressed in regular expression format
- Reading cvs/text files
- Performing sanity and basic column based validation checks like null values, duplicate values, max, min, unique values etc.
- It also infer data type for all columns
- SQL Server connection
- On the fly table creation and bulk insertion for any given text/cvs file
- Logging of all activities using a separate logger module
- Highly configured utility
Main.py:
import Utility
import Logger
import FieldTypes
import SQLServerConnection
import datetime
import os
import re
import string
import pyodbc
#start time
Logger.LogMessage(str(datetime.datetime.now()))
dbConn = SQLServerConnection();
temp = Utility()
#read source directory
reader = open("srcdir.txt","rb")
srcdir = reader.readline()
iterate source directory
temp.iterateFolder(srcdir)
temp.reportProfile()
print "profiling done"
reader.close();
#end time
Logger.LogMessage(str(datetime.datetime.now()))
print datetime.datetime.now()
Utility.py
import SQLServerConnection
import Field
import csv
import os
import re
import string
from Logger import Logger
import datetime
class Utility:
fileName = ""
def __init__(self):
#initialise class variable
#map for fieldName -> fieldInformation
self.data = dict()
self.profileOutput = open('C:\\temp\\profile.csv','w')
self.output = ['FileName','FieldName','InferredDataType','Len','TotalRows','NullValues','UniqueValues']
self.profileOutput.write(','.join(self.output))
self.profileOutput.write('\n')
self.profileList = []
def readcvsFile(self, fileName, onlyName):
ROWSEPARATER = "\n"
FIELDSEPARATER = "\t"
self.data = dict()
#read as a text file
reader = open(fileName,"rb")
headers = reader.readline().split(FIELDSEPARATER)
#header row
Column = len(headers)
#initialised the map
for column in headers:
self.data[column]=Field.Field(column.rstrip())
Logger.LogMessage("No of column: " + str(Column))
#process every row
RowCount = 0
ErrorRow = 0
for rowstring in reader:
RowCount = RowCount + 1
rowCol = rowstring.count(FIELDSEPARATER) + 1
row = rowstring.split(FIELDSEPARATER)
#need to check value of last col
#if it has "," it means there are more delimiter in text file
if rowCol == Column:
i =0
for column in row:
(self.data[headers[i]]).addValue(column)
i = i + 1
else:
Logger.LogMessage("Row number " + str(RowCount) + " is not valid. It has " + str(rowCol) + " columns.")
ErrorRow = ErrorRow + 1
Logger.LogMessage("No of Rows processed(Other than header file):"+str(RowCount))
Logger.LogMessage("Total Errors: "+str(ErrorRow))
reader.close();
name, extention = onlyName.split(".")
createFile = "create table " + name + "("
fieldCount = 1
for k,v in self.data.iteritems():
#mark end of processing and take profiled output for each field
v.endOfLoading()
self.profileList.append([fileName, v.FieldName, v.FieldString, str(v.FieldSize), str(v.RecordSize), str(v.NullValues), str(v.UniqueValues)])
if fieldCount > 1:
createFile = createFile + ", "
createFile = createFile + v.FieldName + " varchar(1000) "
fieldCount = fieldCount + 1
v.clear()
createFile = createFile + ")"
Logger.LogMessage("Sql for file creation:" + createFile)
SQLServerConnection.cursor.execute(createFile)
SQLServerConnection.conn.commit()
bulkcopySQL = "BULK INSERT " + name + " FROM '" + fileName + "' WITH ( FIELDTERMINATOR='\t',FIRSTROW=2,ROWTERMINATOR='" + chr(10) + "')"
Logger.LogMessage("running bluk copy:" + bulkcopySQL)
SQLServerConnection.cursor.execute(bulkcopySQL)
SQLServerConnection.conn.commit()
del self.data
def iterateFolder(self,dir):
fileexp = re.compile(r'\w*\.csv')
#iterate for the directory
for f in os.listdir(dir):
if os.path.isfile(os.path.join(dir,f)) and fileexp.match(f) is not None:
Logger.LogMessage("***************************************")
Logger.LogMessage("FileName: " + f)
Logger.LogMessage("Directory Name: " + dir)
self.readcvsFile(os.path.join(dir,f), f)
Logger.LogMessage("*************************************")
Logger.LogMessage(str(datetime.datetime.now()))
Logger.LogMessage(" ")
print "Processing " , f
Logger.flush()
elif os.path.isdir(os.path.join(os.getcwd(),f)):
print os.path.join(dir,f)
self.iterateFolder(os.path.join(dir,f))
def reportProfile(self):
for detail in self.profileList:
self.profileOutput.write(','.join(detail))
self.profileOutput.write('\n')
Field.py:
from types import *
import re
import datetime
import Logger
class FieldTypes:
Null,Integer, Float, Varchar, Date, DateTime = range(6)
intexp = re.compile('[0-9]+$')
floatexp = re.compile('[0-9]*\.[0-9]+$')
dateexp = re.compile(r'[0-9]{1,4}([.,/-])[0-9]{1,2}\1[0-9]{1,4}$')
#this functions returns corresponding string for enum type
def getEnumString(self, type):
if type == self.Integer:
return "Integer"
if type is self.Float:
return "numeric"
if type is self.Varchar:
return "varchar"
if type is self.Date:
return "date"
if type is self.Null:
return "null"
#this function infers datatype using combination of new value and calculated datatype so far
def fieldType(self,value, previousType):
if len(value) is 0:
return previousType
if previousType is self.Float:
if self.floatexp.match(value) is not None:
return self.Float
if previousType is self.Date:
if self.dateexp.match(value) is not None:
return self.Date
if previousType is self.Null:
if self.intexp.match(value) is not None:
return self.Integer
if self.floatexp.match(value) is not None:
return self.Float
if self.dateexp.match(value) is not None:
return self.Date
if previousType is self.Integer:
if self.intexp.match(value) is not None:
return self.Integer
if self.floatexp.match(value) is not None:
return self.Float
if self.dateexp.match(value) is not None:
return self.Date
return self.Varchar
def getFieldType(self,valueList):
Type = self.Null
for value in valueList:
if Type is self.Varchar:
break
Type = self.fieldType(value, Type)
del valueList
return Type
#Field class represent a complete field domain in the table. It has all properties
#related to a field like name, datatype, size, min value etc. It has contains all
#list of values for that particular field.
class Field:
def __init__(self, name, storeValue=0):
self.FieldName = name
self.FieldSize = 0
self.FieldType = FieldTypes.Null
self.FieldValues = set()
self.RecordSize = 0
self.UniqueValues = 0
self.FieldString = ""
self.NullValues = 0
self.storeVal = storeValue
def addValue(se1lf,val):
self.RecordSize = self.RecordSize +1
#print self.FieldValues
if len(val) is 0:
self.NullValues = self.NullValues + 1
else:
if self.storeVal is not 0:
self.FieldValues.add((val.rstrip()))
if len(val) > self.FieldSize:
self.FieldSize = len(val)
def clear(self):
del self.FieldValues
def endOfLoading(self):
if self.storeVal is not 0:
self.FieldType = FieldTypes().getFieldType(self.FieldValues)
self.UniqueValues = len(self.FieldValues)
self.FieldString = FieldTypes().getEnumString(self.FieldType)
def printSummary(self):
Logger.LogMessage(self.FieldName + " " + self.FieldString+ " " +str(self.FieldSize)+ str(self.RecordSize)+ str(self.NullValues)+ str(self.UniqueValues))
def printValues(self):
for value in self.FieldValues:
print value
Python script to Load Configuration file content into variable
def LoadCfg(filename):
result=[]
fileptr = open(filename,"r")
for line in fileptr:
spt_str=line.split("=")
spt_str[0]= spt_str[0].strip()
spt_str[1]= spt_str[1].strip('\n')
spt_str[1]= spt_str[1].strip()
spt_str[1]= spt_str[1].strip('"')
result.append(spt_str)
fileptr.close()
return dict(result)
Python Script to search file for specific content
def SearchFile(filename,key):
fileptr = open(filename, "r")
for line in fileptr:
if string.find(line,key) >=0 :
fileptr.close()
return 1
fileptr.close()
return 0
Network Wise File Concatenation
#!/usr/bin/env python
import sys, os, fnmatch, re
from threading import Thread
from threading import BoundedSemaphore
pool_sema = BoundedSemaphore(value=20) # only 10 can run at a time
maxsize = 200 # 10 lines for testing
source_directory = "/ranger/RangerRoot/RangerData/CDRDelegatorData/"
destination_directory = "/ranger/RangerRoot/RangerData/CDRDelegatorDataPerf/"
log_directory = "/ranger/RangerRoot/LOG/"
Success_Directory = "/ranger/RangerRoot/RangerData/CDRDelegatorData/success/"
Success_Directory_Perf = "/ranger/RangerRoot/RangerData/CDRDelegatorDataPerf/success/"
filters = []
os.system('date')
for i in range(1, 24):
filters.append("%02d*" % i)
filters.append("90*")
print filters
def fnumlines(filename):
f = open(filename)
return f.read().count("\n")
class PROCESS(Thread):
def __init__(self, filelist, pool_sema, network_id):
Thread.__init__(self)
self.list = filelist
self.pool_sema = pool_sema
self.network_id = network_id
def run(self):
self.pool_sema.acquire()
filename = self.list[0]
PerfSuccessFile = filename
logFile = re.sub("[*]","",self.network_id) + "CDRDelegatorConcatenation.log"
LogFullName = os.path.join(log_directory, logFile)
log = open ( LogFullName ,"a")
file = os.path.join(source_directory, filename)
os.system('cp %s /ranger/ravi/subscriber/' % (file))
rm_success = os.path.join(Success_Directory,filename)
filename = file
touch_Perf = os.path.join(Success_Directory_Perf, PerfSuccessFile )
numlines = fnumlines(file)
try:
f = open(file, "a")
for filename_temp in self.list[1:]:
file = os.path.join(source_directory, filename_temp)
os.system('cp %s /ranger/ravi/subscriber/' % (file))
rm_success_file = os.path.join(Success_Directory, filename_temp)
log_file = self.network_id + "date.log"
log_date = os.path.join(log_directory, log_file)
if numlines < maxsize:
try:
os.system('date > %s' % ( log_date))
log.write(open(log_date).read())
log.write("%s is concatenating to %s \n " % (file,filename))
except IOError:
log.write("Error: can\'t find file or write the data \n")
try:
f.write(open(file).read())
except IOError:
os.system('date > %s' % ( log_date ))
log.write(open(log_date).read())
log.write("Error occured while reading the file %s \n" % (file))
else:
log.write("%s has been concatenated to %s ... DONE\n" % (file,filename))
numlines += fnumlines(file)
os.system('rm %s' %(file) )
os.system('rm %s' %(rm_success_file))
else:
f.close()
# move the file filename
os.system('mv %s %s' %(filename, destination_directory))
touch_Perf = os.path.join(Success_Directory_Perf, PerfSuccessFile)
touch_file = open(touch_Perf,"w")
print touch_Perf
touch_file.close()
PerfSuccessFile = filename_temp
os.system('rm %s' % (rm_success))
# Use a new file
filename = os.path.join(source_directory, file)
rm_success = rm_success_file
numlines = fnumlines(file)
f = open(file, "a")
filename = file
finally:
f.close()
log.close()
os.system('mv %s %s' %(filename, destination_directory))
touch_Perf = os.path.join(Success_Directory_Perf, PerfSuccessFile )
touch_file = open(touch_Perf,"w")
print touch_Perf
touch_file.close()
os.system('rm %s' % (rm_success))
self.pool_sema.release()
def derectory_listing(directory):
flist = os.listdir(directory)
for i in range(len(flist)):
full_path = os.path.join(Success_Directory,flist[i])
statinfo = os.stat(full_path)
flist[i] = statinfo.st_mtime,flist[i]
flist.sort()
x = []
for i in range(len(flist)):
x.append(flist[i][1])
return x
#allfiles = os.listdir(sys.argv[1])
allfiles = derectory_listing(sys.argv[1])
threadlist = []
for filter in filters:
files = fnmatch.filter(allfiles, filter)
if not files:
continue
thread = PROCESS(files, pool_sema, filter)
threadlist.append(thread)
thread.start()
for thread in threadlist:
thread.join()
os.system('date')
#Usage: ./thread_merge_exception.py "Directory Name"
Python Script for Adding or Subtracting the Dates
#-----------------------------
# Adding to or Subtracting from a Date
# Use the rather nice datetime.timedelta objects
now = datetime.date(2003, 8, 6)
difference1 = datetime.timedelta(days=1)
difference2 = datetime.timedelta(weeks=-2)
print "One day in the future is:", now + difference1
#=> One day in the future is: 2003-08-07
print "Two weeks in the past is:", now + difference2
#=> Two weeks in the past is: 2003-07-23
print datetime.date(2003, 8, 6) - datetime.date(2000, 8, 6)
#=> 1095 days, 0:00:00
#-----------------------------
birthtime = datetime.datetime(1973, 01, 18, 3, 45, 50) # 1973-01-18 03:45:50
interval = datetime.timedelta(seconds=5, minutes=17, hours=2, days=55)
then = birthtime + interval
print "Then is", then.ctime()
#=> Then is Wed Mar 14 06:02:55 1973
print "Then is", then.strftime("%A %B %d %I:%M:%S %p %Y")
#=> Then is Wednesday March 14 06:02:55 AM 1973
#-----------------------------
when = datetime.datetime(1973, 1, 18) + datetime.timedelta(days=55)
print "Nat was 55 days old on:", when.strftime("%m/%d/%Y").lstrip("0")
#=> Nat was 55 days old on: 3/14/1973
Program to create make file in VMS/UNIX
This program will used to create a make file in either VMS or UNIX platform. We can use this as reusable module to import in some other program to generate make file in both platforms.
#Program to create executable in either VMS or UNIX platform:
import osimport sys
def make(project_name, param = ""): """Builds a project with unit test directives on. - project_name the name of the project. - param (optional) but can be set to any parameter (taken by make.com) for VMS and only "nodebug" for UNIX. """ project_name = project_name.lower() if sys.platform == "OpenVMS": option = '' if param: option = ' /' + param make_command = '$ pre_c_defines = "define=(""UNITTEST"")"\n' \ '$ pre_cxx_defines = "define=(""UNITTEST"")"\n' \ '$ sym_comp_c_defines = "/define=UNITTEST"\n' \ '$ sym_comp_cxx_defines = ' \ '"/define=(UNITTEST,__USE_STD_IOSTREAM)"\n' \ '$ make ' + project_name + option + '\n' else: # UNIX platform option = '' if param is "nodebug" and isdebug(): # This is the same as release command which is currently undefined. option = 'unset sym_debug;. setdef.ksh > /dev/null\n' make_command = option + \ 'export sym_proc_parm="define=UNITTEST"\n' \ 'export sym_comp_parm="-DUNITTEST"\n' \ 'rmake ' + project_name os.system(make_command) return
Python Code for creating Screen saver
import appuifw
from graphics import *
import e32
from key_codes import *
class Keyboard(object):
def __init__(self,onevent=lambda:None):
self._keyboard_state={}
self._downs={}
self._onevent=onevent
def handle_event(self,event):
if event['type'] == appuifw.EEventKeyDown:
code=event['scancode']
if not self.is_down(code):
self._downs[code]=self._downs.get(code,0)+1
self._keyboard_state[code]=1
elif event['type'] == appuifw.EEventKeyUp:
self._keyboard_state[event['scancode']]=0
self._onevent()
def is_down(self,scancode):
return self._keyboard_state.get(scancode,0)
def pressed(self,scancode):
if self._downs.get(scancode,0):
self._downs[scancode]-=1
return True
return False
keyboard=Keyboard()
appuifw.app.screen='full'
img=None
def handle_redraw(rect):
if img:
canvas.blit(img)
appuifw.app.body=canvas=appuifw.Canvas(
event_callback=keyboard.handle_event,
redraw_callback=handle_redraw)
img=Image.new(canvas.size)
running=1
def quit():
global running
running=0
appuifw.app.exit_key_handler=quit
location=[img.size[0]/2,img.size[1]/2]
speed=[0.,0.]
blobsize=16
xs,ys=img.size[0]-blobsize,img.size[1]-blobsize
gravity=0.03
acceleration=0.05
import time
start_time=time.clock()
n_frames=0
labeltext=u'Use arrows to move ball'
textrect=img.measure_text(labeltext, font='normal')[0]
text_img=Image.new((textrect[2]-textrect[0],textrect[3]-textrect[1]))
text_img.clear(0)
text_img.text((-textrect[0],-textrect[1]),labeltext,fill=0xffffff,font='normal')
while running:
img.clear(0)
img.blit(text_img, (0,0))
img.point((location[0]+blobsize/2,location[1]+blobsize/2),
0x00ff00,width=blobsize)
handle_redraw(())
e32.ao_yield()
speed[0]*=0.999
speed[1]*=0.999
speed[1]+=gravity
location[0]+=speed[0]
location[1]+=speed[1]
if location[0]>xs:
location[0]=xs-(location[0]-xs)
speed[0]=-0.80*speed[0]
speed[1]=0.90*speed[1]
if location[0]<0:
location[0]=-location[0]
speed[0]=-0.80*speed[0]
speed[1]=0.90*speed[1]
if location[1]>ys:
location[1]=ys-(location[1]-ys)
speed[0]=0.90*speed[0]
speed[1]=-0.80*speed[1]
if location[1]<0:
location[1]=-location[1]
speed[0]=0.90*speed[0]
speed[1]=-0.80*speed[1]
if keyboard.is_down(EScancodeLeftArrow): speed[0] -= acceleration
if keyboard.is_down(EScancodeRightArrow): speed[0] += acceleration
if keyboard.is_down(EScancodeDownArrow): speed[1] += acceleration
if keyboard.is_down(EScancodeUpArrow): speed[1] -= acceleration
if keyboard.pressed(EScancodeHash):
filename=u'e:\\screenshot.png'
canvas.text((0,32),u'Saving screenshot to:',fill=0xffff00)
canvas.text((0,48),filename,fill=0xffff00)
img.save(filename)
n_frames+=1
end_time=time.clock()
total=end_time-start_time
print "%d frames, %f seconds, %f FPS, %f ms/frame."%(n_frames,total,
n_frames/total,
total/n_frames*1000.)
Run Application from host
import os
os.execl( "c:/Windows/Notepad.exe", "c:/userlog.txt")
print "Running notepad"
#or
import subprocess
subprocess.call("c:/Windows/Notepad.exe")
print "Running notepad"
#starting a Web browser:
import os
os.system('/usr/bin/firefox')
os.system(r'c:\"Program Files"\"Mozilla Firefox"\firefox.exe')
#Windows specific function os.startfile:
import os
os.startfile(r' c:\Program Files\Mozilla Firefox\firefox.exe')
Multithreading in Python
This code will give you clear view of how python programming is useful in threading compare to other programming language.
import threading,Queue,time,sys,traceback
#Globals (start with a captial letter)
Qin = Queue.Queue()
Qout = Queue.Queue()
Qerr = Queue.Queue()
Pool = []
def err_msg():
trace= sys.exc_info()[2]
try:
exc_value=str(sys.exc_value)
except:
exc_value=''
return str(traceback.format_tb(trace)),str(sys.exc_type),exc_value
def get_errors():
try:
while 1:
yield Qerr.get_nowait()
except Queue.Empty:
pass
def process_queue():
flag='ok'
while flag !='stop':
try:
flag,item=Qin.get() #will wait here!
if flag=='ok':
newdata='new'+item
Qout.put(newdata)
except:
Qerr.put(err_msg())
def start_threads(amount=5):
for i in range(amount):
thread = threading.Thread(target=process_queue)
thread.start()
Pool.append(thread)
def put(data,flag='ok'):
Qin.put([flag,data])
def get(): return Qout.get() #will wait here!
def get_all():
try:
while 1:
yield Qout.get_nowait()
except Queue.Empty:
pass
def stop_threads():
for i in range(len(Pool)):
Qin.put(('stop',None))
while Pool:
time.sleep(1)
for index,the_thread in enumerate(Pool):
if the_thread.isAlive():
continue
else:
del Pool[index]
break
#STANDARD use:
for i in ('b','c'): put(i)
start_threads()
stop_threads()
for i in get_all(): print i
for i in get_errors(): print i
#POOL use
#put element into input queue
put('a')
#setup threads -- will run forever as a pool until you shutdown
start_threads()
for i in ('b','c'): put(i)
#get an element from output queue
print get()
#put even more data in, 7 causes an error
for i in ('d','e',7): put(i)
#get whatever is available
for i in get_all(): print i
#stop_threads only returns when all threads have stopped
stop_threads()
print '__threads finished last data available__'
for i in get_all(): print i
for i in get_errors(): print i
#starting up threads again
start_threads()
put('f')
stop_threads()
print '__threads finished(again) last data available__'
for i in get_all(): print i
for i in get_errors(): print i
This utility determines the size of a folder and its subfolders in MB
This simple Python snippet will list the size of all the subfolders in a parent folder in MB.This is developed by using a python utility OS.
# determine size of a given folder in MBytes
import os
# pick a folder you have ...
folder = 'D:\\zz1'
folder_size = 0
for (path, dirs, files) in os.walk(folder):
for file in files:
filename = os.path.join(path, file)
folder_size += os.path.getsize(filename)
print "Folder = %0.1f MB" % (folder_size/(1024*1024.0))
Using UNIX Environment Variables From Python
import os
command = "unset TEST " + "\n" \
"export GLOBAL_VALUE='global_value'" + "\n" \
"sample\n"
scr_name = "temp.tmp"
scr_file = open(scr_name, 'w') #write everything to a
scr_file.write(command)
scr_file.close()
os.chmod(scr_name, 0744) # mode -rwxr--r--
status = os.system(scr_name)
os.remove(scr_name)
CVS Macro for Commited Files
from cvsgui.Macro import *
from cvsgui.CvsEntry import *
from cvsgui.ColorConsole import *
import cvsgui.App
import os.path, os, shutil
import zipfile, string
import re
from cvsgui.Persistent import *
from cvsgui.MenuMgr import *
from cvsgui.SafeTk import *
class MyConfig(Macro):
def __init__(self):
Macro.__init__(self, "My Utility", MACRO_SELECTION,
0, "My Menu")
self.m_path = Persistent("PY_PATH", "select the destination folder", 1)
self.m_date = Persistent("PY_DATE", "dd-mmm-yyyy", 1)
self.m_name = Persistent("PY_NAME","name",1)
def OnCmdUI(self, cmdui):
cmdui.Enable(1)
def Run(self):
msg = "Please enter the path for delivery :"
msg1 = "Please enter the Date from when you want modified files :"
msg2 = "Please enter the user name whose commited files have to be created in directory structure"
title = "Path"
outputpath = str(self.m_path)
datestr = str(self.m_date)
namestr = str(self.m_name)
res, outputpath = cvsgui.App.PromptEditMessage(msg, outputpath, title)
if res and len(outputpath) > 0:
self.m_path << outputpath
res,datestr = cvsgui.App.PromptEditMessage(msg1, datestr, title)
if res and len(datestr) > 0:
self.m_date << datestr
res,namestr = cvsgui.App.PromptEditMessage(msg2, namestr, title)
if res and len(namestr) > 0:
self.m_name << namestr
cvs = Cvs(1)
console = ColorConsole()
console << kMagenta << datestr << "\n" << kNormal
console << kMagenta << "Username:" << namestr << "\n" << kNormal
okCancel = _cvsgui.PromptMessage("Shall I Continue", "Message", "OK", "Cancel")
if okCancel == 0:
return
code, out, err = cvs.Run("history", "-xACM", "-D%s" % datestr, "-u%s" % namestr)
lines= string.split(out, '\n')
console << kMagenta << "Copying files : " <<"\n"
for line in lines:
#console << kMagenta << line << "\n" << kNormal
mobj = re.match( "^[MCA] \d\d\d\d-\d\d-\d\d \d\d:\d\d \+0000 [a-z_]+ +[\d.]+ +(.*) +(PW_Portal.*) +==",
line)
if mobj:
file_name = mobj.group(1).strip()
dir_name = mobj.group(2).strip()
#console << kBlue << dir_name << "/" << file_name << "\n" << kNormal
srcname = os.path.join("c:/work/", dir_name + '/'+ file_name)
targetname = os.path.join(outputpath, "delivery"+ '/'+ dir_name)
if not os.path.exists(targetname):
os.makedirs(targetname)
shutil.copy(srcname, targetname)
console << kGreen << srcname << " to " << targetname <<"\n" << kNormal
MyConfig()
Generating unique id's
The script is developed in python.It sets unique identifier for every database.It can be reused to set unique ids (alphanumeric values) to any set of data.
def generate_grpid(dbname):
dict = {'pdb':0,
'vast':1,
'taxonomy':2,
'genbank':3,
'dbSNP':4,
'enzyme':5,
'uniprot':6,
'go':7,
'interpro':8,
'rebase':9,
'prosite':10
'pdb-hetero':11
}
x = dict[dbname]
# 'x' is the unique id returned.
return x
Bind Key action: Return
Bind Key action: Return
from Tkinter import *
root = Tk()
def greet(*ignore): print 'Hello World'
root.bind('', greet)
root.mainloop()
Write CSV File Using Python
import csv
spamWriter = csv.writer(open('eggs.csv', 'w'), delimiter=' ',
quotechar='|', quoting=QUOTE_MINIMAL)
spamWriter.writerow(['Spam'] * 5 + ['Baked Beans'])
spamWriter.writerow(['Spam', 'Lovely Spam', 'Wonderful Spam'])
Read CSV File using Python
import csv
spamReader = csv.reader(open('eggs.csv'), delimiter=' ', quotechar='|')
for row in spamReader:
print ', '.join(row)
# Spam, Spam, Spam, Spam, Spam, Baked Beans
# Spam, Lovely Spam, Wonderful Spam
Create an HTML message with an alternative plain text version
Here is a Python Program to create an HTML message with an alternative plain text version
#!/usr/bin/env python
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
# me == my email address
# you == recipient's email address
me = "my@email.com"
you = "your@email.com"
# Create message container - the correct MIME type is multipart/alternative.
msg = MIMEMultipart('alternative')
msg['Subject'] = "Link"
msg['From'] = me
msg['To'] = you
# Create the body of the message (a plain-text and an HTML version).
text = "Hi!\nHow are you?\nHere is the link you wanted:\nhttp://www.python.org"
html = """\
Hi!
How are you?
Here is the link you wanted.
"""
# Record the MIME types of both parts - text/plain and text/html.
part1 = MIMEText(text, 'plain')
part2 = MIMEText(html, 'html')
# Attach parts into message container.
# According to RFC 2046, the last part of a multipart message, in this case
# the HTML message, is best and preferred.
msg.attach(part1)
msg.attach(part2)
# Send the message via local SMTP server.
s = smtplib.SMTP('localhost')
# sendmail function takes 3 arguments: sender's address, recipient's address
# and message to send - here it is sent as one string.
s.sendmail(me, you, msg.as_string())
s.quit()
To Send the entire contents of directory as an email Message.
Here is a Python Program to send the entire contents of a directory as an email message
#!/usr/bin/env python
"""Send the contents of a directory as a MIME message."""
import os
import sys
import smtplib
# For guessing MIME type based on file name extension
import mimetypes
from optparse import OptionParser
from email import encoders
from email.message import Message
from email.mime.audio import MIMEAudio
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
COMMASPACE = ', '
def main():
parser = OptionParser(usage="""\
Send the contents of a directory as a MIME message.
Usage: %prog [options]
Unless the -o option is given, the email is sent by forwarding to your local
SMTP server, which then does the normal delivery process. Your local machine
must be running an SMTP server.
""")
parser.add_option('-d', '--directory',
type='string', action='store',
help="""Mail the contents of the specified directory,
otherwise use the current directory. Only the regular
files in the directory are sent, and we don't recurse to
subdirectories.""")
parser.add_option('-o', '--output',
type='string', action='store', metavar='FILE',
help="""Print the composed message to FILE instead of
sending the message to the SMTP server.""")
parser.add_option('-s', '--sender',
type='string', action='store', metavar='SENDER',
help='The value of the From: header (required)')
parser.add_option('-r', '--recipient',
type='string', action='append', metavar='RECIPIENT',
default=[], dest='recipients',
help='A To: header value (at least one required)')
opts, args = parser.parse_args()
if not opts.sender or not opts.recipients:
parser.print_help()
sys.exit(1)
directory = opts.directory
if not directory:
directory = '.'
# Create the enclosing (outer) message
outer = MIMEMultipart()
outer['Subject'] = 'Contents of directory %s' % os.path.abspath(directory)
outer['To'] = COMMASPACE.join(opts.recipients)
outer['From'] = opts.sender
outer.preamble = 'You will not see this in a MIME-aware mail reader.\n'
for filename in os.listdir(directory):
path = os.path.join(directory, filename)
if not os.path.isfile(path):
continue
# Guess the content type based on the file's extension. Encoding
# will be ignored, although we should check for simple things like
# gzip'd or compressed files.
ctype, encoding = mimetypes.guess_type(path)
if ctype is None or encoding is not None:
# No guess could be made, or the file is encoded (compressed), so
# use a generic bag-of-bits type.
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
if maintype == 'text':
fp = open(path)
# Note: we should handle calculating the charset
msg = MIMEText(fp.read(), _subtype=subtype)
fp.close()
elif maintype == 'image':
fp = open(path, 'rb')
msg = MIMEImage(fp.read(), _subtype=subtype)
fp.close()
elif maintype == 'audio':
fp = open(path, 'rb')
msg = MIMEAudio(fp.read(), _subtype=subtype)
fp.close()
else:
fp = open(path, 'rb')
msg = MIMEBase(maintype, subtype)
msg.set_payload(fp.read())
fp.close()
# Encode the payload using Base64
encoders.encode_base64(msg)
# Set the filename parameter
msg.add_header('Content-Disposition', 'attachment', filename=filename)
outer.attach(msg)
# Now send or store the message
composed = outer.as_string()
if opts.output:
fp = open(opts.output, 'w')
fp.write(composed)
fp.close()
else:
s = smtplib.SMTP('localhost')
s.sendmail(opts.sender, opts.recipients, composed)
s.quit()
if __name__ == '__main__':
main()
To send mail containing Pictures | Python Example
Here is a Python Program to send a MIME message containing a bunch of family pictures that may be residing in a directory
# Import smtplib for the actual sending function
import smtplib
# Here are the email package modules we'll need
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
COMMASPACE = ', '
# Create the container (outer) email message.
msg = MIMEMultipart()
msg['Subject'] = 'Our family reunion'
# me == the sender's email address
# family = the list of all recipients' email addresses
msg['From'] = me
msg['To'] = COMMASPACE.join(family)
msg.preamble = 'Our family reunion'
# Assume we know that the image files are all in PNG format
for file in pngfiles:
# Open the files in binary mode. Let the MIMEImage class automatically
# guess the specific image type.
fp = open(file, 'rb')
img = MIMEImage(fp.read())
fp.close()
msg.attach(img)
# Send the email via our own SMTP server.
s = smtplib.SMTP('localhost')
s.sendmail(me, family, msg.as_string())
s.quit()
How to create and send a simple text message
Here is a Python Program to create and send a simple text message:
# Import smtplib for the actual sending function
import smtplib
# Import the email modules we'll need
from email.mime.text import MIMEText
# Open a plain text file for reading. For this example, assume that
# the text file contains only ASCII characters.
fp = open(textfile, 'rb')
# Create a text/plain message
msg = MIMEText(fp.read())
fp.close()
# me == the sender's email address
# you == the recipient's email address
msg['Subject'] = 'The contents of %s' % textfile
msg['From'] = me
msg['To'] = you
# Send the message via our own SMTP server, but don't include the
# envelope header.
s = smtplib.SMTP('localhost')
s.sendmail(me, [you], msg.as_string())
s.quit()
Close a file after using | Python Example
import sys
def open_file(file_name, mode):
"""Open a file."""
try:
the_file = open(file_name, mode)
except(IOError), e:
print "Unable to open the file", file_name, "Ending program.\n", e
raw_input("\n\nPress the enter key to exit.")
sys.exit()
else:
return the_file
def next_line(the_file):
"""Return next line from the trivia file, formatted."""
line = the_file.readline()
line = line.replace("/", "\n")
return line
trivia_file = open_file("trivia.txt", "r")
title = next_line(trivia_file)
trivia_file.close()
Python Program to Close the File
Here is a Python Program to Close the File
# Open files consume system resources, and depending on the file mode,
# other programs may not be able to access them.
# It's important to close files as soon as you're finished with them, as shown in
#
f = open("/music/_singles/kairo.mp3", "rb")
print f.closed
f.close()
print f
print f.closed
# The closed attribute of a file object indicates whether the object has
# a file open or not. In this case, the file is still open (closed is False).
Python Program to Reading Binary Files
f = open("/music/_singles/kairo.mp3", "rb")
print f
print f.tell()
f.seek(-128, 2)
print f.tell()
tagData = f.read(128)
print tagData
print f.tell()
Python Program to Working with Directories
# The os.path module has several functions for manipulating files and directories.
# Here, we'll look at handling pathnames and listing the contents of a directory.
import os
print os.path.join("c:\\yourfolder\\", "yourfile.txt")
print os.path.join("c:\\yourfolder", "yourfile.txt")
os.path.expanduser("~")
print os.path.join(os.path.expanduser("~"), "Python")
# os.path is a reference to a module-which module depends on your platform. Just
# as getpass encapsulates differences between platforms by setting getpass to a
# platform-specific function, os encapsulates differences between platforms by
# setting path to a platform-specific module.
Python Program to Change Directory
Here is a Python Program to Change Directory
import os
os.chdir('/server/accesslogs')
#Be sure to use the "import os" style instead of "from os import *". This will keep
#os.open() from shadowing the builtin open() function which operates much
#differently.
Listing Directories with glob | Python Examples
import os
os.listdir("c:\\")
import glob
glob.glob('c:\\*.mp3')
glob.glob('c:\\*.mp3')
glob.glob('c:\\c\\*\\*.mp3')
Python Program to Listing Directories
import os
os.listdir("c:\\")
dirname = "c:\\"
os.listdir(dirname)
print [f for f in os.listdir(dirname) if os.path.isfile(os.path.join(dirname, f))] [3]
print [f for f in os.listdir(dirname) if os.path.isdir(os.path.join(dirname, f))] [4]
Mail Alert using SMTP in python
import smtplib
#to send the email to concerned persons
def Send_Mail(TEXT):
sender = 'sender_mail_address in single quotes '
receivers = ['firstperson mailaddress in single quotes ','secondperson mail address in single quotes ']
# Prepare actual message
message = "Subject:ALERT!\n"
message=message+TEXT
try:
smtpObj = smtplib.SMTP(' SMTP MAIL SERVER in single quotes ',25)
smtpObj.sendmail(sender, receivers, message)
smtpObj.close()
except smtplib.SMTPConnectError:
print "Error: unable to send email by connect"
except smtplib.SMTPException:
print "Error: unable to send email"
#main
Send_Mail("enter you text here..")
Inserting into MySQL Database Using Python Program
import MySQLdb
db= MySQLdb.connect(host="localhost", user="python-test", passwd="python",
db="python-test")
try:
title = raw_input("Please enter a book title: ")
if title == "" :
exit
author = raw_input("Please enter the author's name: ")
pubdate = int( raw_input("Enter the publication year: ") )
except:
print "Invalid value"
exit
print "Title: [" + title + "]"
print "Author: ["+ author + "]"
print "Publication Date: " + str(pubdate)
cursor = db.cursor()
stmt = "INSERT INTO Books (BookName, BookAuthor, PublicationDate) VALUES ('"
stmt = stmt + title
stmt = stmt + "', '"
stmt = stmt + author
stmt = stmt + "', "
stmt = stmt + str(pubdate)
stmt = stmt + ")"
cursor.execute(stmt)
print "Record added!"
cursor.close ()
db.commit ()
Updating a MySQL Database with Python Language | Python Example
import MySQLdb
import MySQLdb.cursors
def get_column_name( data, prompt, names ) :
value=-1
while value == -1:
idx = 1
for col in data :
print str(idx) + ": " + col
names.append( col )
idx = idx + 1
value = int( raw_input(prompt) )
if value < 1 or value >= idx :
value = -1
return value
conn = MySQLdb.Connect(
host='localhost', user='python-test',
passwd='python', db='python-test')
cursor = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor)
cursor.execute("SELECT * FROM books")
data = cursor.fetchone()
names = []
old_value = get_column_name( data, "Which column do you want to change records for? ", names )
names = []
new_value = get_column_name( data, "Which column do you want to change records to? ", names )
old_val = raw_input("What value do you want to change for " + names[old_value-1] + ": ")
new_val = raw_input("What value do you want to change to for " + names[new_value-1] + ": ")
stmt = "UPDATE books SET " + names[new_value-1] + " = '"+ new_val + "' WHERE " + names[old_value-1] + " = '" + old_val + "'"
print stmt
cursor.execute(stmt)
print "Rows affected: " + str(cursor.rowcount)
cursor.close()
conn.commit()
conn.close()
Reading Data from MySQL Database | Python Example
import MySQLdb
db= MySQLdb.connect(host="localhost", user="python-test", passwd="python",
db="python-test")
cursor = db.cursor()
stmt = "SELECT * from books"
cursor.execute(stmt)
rows = cursor.fetchall ()
for row in rows:
print "Row: "
for col in row :
print "Column: %s" % (col)
print "End of Row"
print "Number of rows returned: %d" % cursor.rowcount
cursor.close()
db.close()
LED Control Using Python Program
#/*
#Python and Tkinter Programming
#John E. Grayson
#ISBN: 1884777813
#Publisher: Manning
#*/
from Tkinter import *
SQUARE = 1
ROUND = 2
ARROW = 3
POINT_DOWN = 0
POINT_UP = 1
POINT_RIGHT = 2
POINT_LEFT = 3
STATUS_OFF = 1
STATUS_ON = 2
STATUS_WARN = 3
STATUS_ALARM = 4
STATUS_SET = 5
class DummyClass:
pass
Color = DummyClass()
Color.PANEL = '#545454'
Color.OFF = '#656565'
Color.ON = '#00FF33'
Color.WARN = '#ffcc00'
Color.ALARM = '#ff4422'
class LED:
def __init__(self, master=None, width=25, height=25,
appearance=FLAT,
status=STATUS_ON, bd=1,
bg=None,
shape=SQUARE, outline="",
blink=0, blinkrate=1,
orient=POINT_UP,
takefocus=0):
# preserve attributes
self.master = master
self.shape = shape
self.onColor = Color.ON
self.offColor = Color.OFF
self.alarmColor = Color.ALARM
self.warningColor = Color.WARN
self.specialColor = '#00ffdd'
self.status = status
self.blink = blink
self.blinkrate = int(blinkrate)
self.on = 0
self.onState = None
if not bg:
bg = Color.PANEL
## Base frame to contain light
self.frame=Frame(master, relief=appearance, bg=bg, bd=bd,
takefocus=takefocus)
basesize = width
d = center = int(basesize/2)
if self.shape == SQUARE:
self.canvas=Canvas(self.frame, height=height, width=width,
bg=bg, bd=0, highlightthickness=0)
self.light=self.canvas.create_rectangle(0, 0, width, height,
fill=Color.ON)
elif self.shape == ROUND:
r = int((basesize-2)/2)
self.canvas=Canvas(self.frame, width=width, height=width,
highlightthickness=0, bg=bg, bd=0)
if bd > 0:
self.border=self.canvas.create_oval(center-r, center-r,
center+r, center+r)
r = r - bd
self.light=self.canvas.create_oval(center-r-1, center-r-1,
center+r, center+r, fill=Color.ON,
outline=outline)
else: # Default is an ARROW
self.canvas=Canvas(self.frame, width=width, height=width,
highlightthickness=0, bg=bg, bd=0)
x = d
y = d
if orient == POINT_DOWN:
self.light=self.canvas.create_polygon(x-d,y-d, x,y+d,
x+d,y-d, x-d,y-d, outline=outline)
elif orient == POINT_UP:
self.light=self.canvas.create_polygon(x,y-d, x-d,y+d,
x+d,y+d, x,y-d, outline=outline)
elif orient == POINT_RIGHT:
self.light=self.canvas.create_polygon(x-d,y-d, x+d,y,
x-d,y+d, x-d,y-d, outline=outline)
elif orient == POINT_LEFT:
self.light=self.canvas.create_polygon(x-d,y, x+d,y+d,
x+d,y-d, x-d,y, outline=outline)
self.canvas.pack(side=TOP, fill=X, expand=NO)
self.update()
def turnon(self):
self.status = STATUS_ON
if not self.blink: self.update()
def turnoff(self):
self.status = STATUS_OFF
if not self.blink: self.update()
def alarm(self):
self.status = STATUS_ALARM
if not self.blink: self.update()
def warn(self):
self.status = STATUS_WARN
if not self.blink: self.update()
def set(self, color):
self.status = STATUS_SET
self.specialColor = color
self.update()
def blinkon(self):
if not self.blink:
self.blink = 1
self.onState = self.status
self.update()
def blinkoff(self):
if self.blink:
self.blink = 0
self.status = self.onState
self.onState = None
self.on = 0
self.update()
def blinkstate(self, blinkstate):
if blinkstate:
self.blinkon()
else:
self.blinkoff()
def update(self):
# First do the blink, if set to blink
if self.blink:
if self.on:
if not self.onState:
self.onState = self.status
self.status = STATUS_OFF
self.on = 0
else:
if self.onState:
self.status = self.onState # Current ON color
self.on = 1
if self.status == STATUS_ON:
self.canvas.itemconfig(self.light, fill=self.onColor)
elif self.status == STATUS_OFF:
self.canvas.itemconfig(self.light, fill=self.offColor)
elif self.status == STATUS_WARN:
self.canvas.itemconfig(self.light, fill=self.warningColor)
elif self.status == STATUS_SET:
self.canvas.itemconfig(self.light, fill=self.specialColor)
else:
self.canvas.itemconfig(self.light, fill=self.alarmColor)
self.canvas.update_idletasks()
if self.blink:
self.frame.after(self.blinkrate * 1000, self.update)
if __name__ == '__main__':
class TestLEDs(Frame):
def __init__(self, parent=None):
# List of Colors and Blink On/Off
states = [(STATUS_OFF, 0),
(STATUS_ON, 0),
(STATUS_WARN, 0),
(STATUS_ALARM, 0),
(STATUS_SET, 0),
(STATUS_ON, 1),
(STATUS_WARN, 1),
(STATUS_ALARM, 1),
(STATUS_SET, 1)]
# List of LED types to display,
# with sizes and other attributes
leds = [(ROUND, 25, 25, FLAT, 0, None, ""),
(ROUND, 15, 15, RAISED, 1, None, ""),
(SQUARE, 20, 20, SUNKEN, 1, None, ""),
(SQUARE, 8, 8, FLAT, 0, None, ""),
(SQUARE, 8, 8, RAISED, 1, None, ""),
(SQUARE, 16, 8, FLAT, 1, None, ""),
(ARROW, 14, 14, RIDGE, 1, POINT_UP, ""),
(ARROW, 14, 14, RIDGE, 0, POINT_RIGHT, ""),
(ARROW, 14, 14, FLAT, 0, POINT_DOWN, "white")]
Frame.__init__(self) # Do superclass init
self.pack()
self.master.title('LED Example - Stage 1')
# Iterate for each type of led
for shape, w, h, app, bd, orient, outline in leds:
frame = Frame(self, bg=Color.PANEL)
frame.pack(anchor=N, expand=YES, fill=X)
# Iterate for selected states
for state, blink in states:
LED(frame, shape=shape, status=state,
width=w, height=h, appearance=app,
orient=orient, blink=blink, bd=bd,
outline=outline).frame.pack(side=LEFT,
expand=YES, padx=1, pady=1)
TestLEDs().mainloop()
Make a Simple Calculator using Python Language | Python Example
from Tkinter import *
import Pmw, string
class SLabel(Frame):
""" SLabel defines a 2-sided label within a Frame. The
left hand label has blue letters the right has white letters """
def __init__(self, master, leftl, rightl):
Frame.__init__(self, master, bg='gray40')
self.pack(side=LEFT, expand=YES, fill=BOTH)
Label(self, text=leftl, fg='steelblue1',
font=("arial", 6, "bold"), width=5, bg='gray40').pack(
side=LEFT, expand=YES, fill=BOTH)
Label(self, text=rightl, fg='white',
font=("arial", 6, "bold"), width=1, bg='gray40').pack(
side=RIGHT, expand=YES, fill=BOTH)
class Key(Button):
def __init__(self, master, font=('arial', 8, 'bold'),
fg='white',width=5, borderwidth=5, **kw):
kw['font'] = font
kw['fg'] = fg
kw['width'] = width
kw['borderwidth'] = borderwidth
apply(Button.__init__, (self, master), kw)
self.pack(side=LEFT, expand=NO, fill=NONE)
class Calculator(Frame):
def __init__(self, parent=None):
Frame.__init__(self, bg='gray40')
self.pack(expand=YES, fill=BOTH)
self.master.title('Tkinter Toolkit TT-42')
self.master.iconname('Tk-42')
self.calc = Evaluator() # This is our evaluator
self.buildCalculator() # Build the widgets
# This is an incomplete dictionary - a good exercise!
self.actionDict = {'second': self.doThis, 'mode': self.doThis,
'delete': self.doThis, 'alpha': self.doThis,
'stat': self.doThis, 'math': self.doThis,
'matrix': self.doThis, 'program': self.doThis,
'vars': self.doThis, 'clear': self.clearall,
'sin': self.doThis, 'cos': self.doThis,
'tan': self.doThis, 'up': self.doThis,
'X1': self.doThis, 'X2': self.doThis,
'log': self.doThis, 'ln': self.doThis,
'store': self.doThis, 'off': self.turnoff,
'neg': self.doThis, 'enter': self.doEnter,
}
self.current = ""
def doThis(self,action):
print '"%s" has not been implemented' % action
def turnoff(self, *args):
self.quit()
def clearall(self, *args):
self.current = ""
self.display.component('text').delete(1.0, END)
def doEnter(self, *args):
result = self.calc.runpython(self.current)
if result:
self.display.insert(END, '\n')
self.display.insert(END, '%s\n' % result, 'ans')
self.current = ""
def doKeypress(self, event):
key = event.char
if not key in ['\b']:
self.current = self.current + event.char
if key == '\b':
self.current = self.current[:-1]
def keyAction(self, key):
self.display.insert(END, key)
self.current = self.current + key
def evalAction(self, action):
try:
self.actionDict[action](action)
except KeyError:
pass
def buildCalculator(self):
FUN = 1 # Designates a Function
KEY = 0 # Designates a Key
KC1 = 'gray30' # Dark Keys
KC2 = 'gray50' # Light Keys
KC3 = 'steelblue1' # Light Blue Key
KC4 = 'steelblue' # Dark Blue Key
keys = [
[('2nd', '', '', KC3, FUN, 'second'), # Row 1
('Mode', 'Quit', '', KC1, FUN, 'mode'),
('Del', 'Ins', '', KC1, FUN, 'delete'),
('Alpha','Lock', '', KC2, FUN, 'alpha'),
('Stat', 'List', '', KC1, FUN, 'stat')],
[('Math', 'Test', 'A', KC1, FUN, 'math'), # Row 2
('Mtrx', 'Angle','B', KC1, FUN, 'matrix'),
('Prgm', 'Draw', 'C', KC1, FUN, 'program'),
('Vars', 'YVars','', KC1, FUN, 'vars'),
('Clr', '', '', KC1, FUN, 'clear')],
[('X-1', 'Abs', 'D', KC1, FUN, 'X1'), # Row 3
('Sin', 'Sin-1','E', KC1, FUN, 'sin'),
('Cos', 'Cos-1','F', KC1, FUN, 'cos'),
('Tan', 'Tan-1','G', KC1, FUN, 'tan'),
('^', 'PI', 'H', KC1, FUN, 'up')],
[('X2', 'Root', 'I', KC1, FUN, 'X2'), # Row 4
(',', 'EE', 'J', KC1, KEY, ','),
('(', '{', 'K', KC1, KEY, '('),
(')', '}', 'L', KC1, KEY, ')'),
('/', '', 'M', KC4, KEY, '/')],
[('Log', '10x', 'N', KC1, FUN, 'log'), # Row 5
('7', 'Un-1', 'O', KC2, KEY, '7'),
('8', 'Vn-1', 'P', KC2, KEY, '8'),
('9', 'n', 'Q', KC2, KEY, '9'),
('X', '[', 'R', KC4, KEY, '*')],
[('Ln', 'ex', 'S', KC1, FUN, 'ln'), # Row 6
('4', 'L4', 'T', KC2, KEY, '4'),
('5', 'L5', 'U', KC2, KEY, '5'),
('6', 'L6', 'V', KC2, KEY, '6'),
('-', ']', 'W', KC4, KEY, '-')],
[('STO', 'RCL', 'X', KC1, FUN, 'store'), # Row 7
('1', 'L1', 'Y', KC2, KEY, '1'),
('2', 'L2', 'Z', KC2, KEY, '2'),
('3', 'L3', '', KC2, KEY, '3'),
('+', 'MEM', '"', KC4, KEY, '+')],
[('Off', '', '', KC1, FUN, 'off'), # Row 8
('0', '', '', KC2, KEY, '0'),
('.', ':', '', KC2, KEY, '.'),
('(-)', 'ANS', '?', KC2, FUN, 'neg'),
('Enter','Entry','', KC4, FUN, 'enter')]]
self.display = Pmw.ScrolledText(self, hscrollmode='dynamic',
vscrollmode='dynamic', hull_relief='sunken',
hull_background='gray40', hull_borderwidth=10,
text_background='honeydew4', text_width=16,
text_foreground='black', text_height=6,
text_padx=10, text_pady=10, text_relief='groove',
text_font=('arial', 12, 'bold'))
self.display.pack(side=TOP, expand=YES, fill=BOTH)
self.display.tag_config('ans', foreground='white')
self.display.component('text').bind('', self.doKeypress)
self.display.component('text').bind('', self.doEnter)
for row in keys:
rowa = Frame(self, bg='gray40')
rowb = Frame(self, bg='gray40')
for p1, p2, p3, color, ktype, func in row:
if ktype == FUN:
a = lambda s=self, a=func: s.evalAction(a)
else:
a = lambda s=self, k=func: s.keyAction(k)
SLabel(rowa, p2, p3)
Key(rowb, text=p1, bg=color, command=a)
rowa.pack(side=TOP, expand=YES, fill=BOTH)
rowb.pack(side=TOP, expand=YES, fill=BOTH)
class Evaluator:
def __init__(self):
self.myNameSpace = {}
self.runpython("from math import *")
def runpython(self, code):
try:
return 'eval(code, self.myNameSpace, self.myNameSpace)'
except SyntaxError:
try:
exec code in self.myNameSpace, self.myNameSpace
except:
return 'Error'
Calculator().mainloop()


