Using SQLAlchemy with PyUNO
Contents
- 1 Original code
- 2 Process of the script
- 2.1 Logging module
- 2.2 Connect to datasource
- 2.3 OpenOffice on listening mode
- 2.4 Search and replace function
- 2.5 Load and save documents
- 2.6 Save documents
- 2.7 Perform query
- 2.8 Loop to replace fields
- 2.9 Load document from template
- 2.10 Generate search action
- 2.11 Search the data
- 2.12 Finalize the document
Original code
"""This program Gets records from sqlalchemy and replaces templates fields inside OpenOffice Copyright Lukasz Szybalski szybalski@gmail.com License: GNU General Public License 2 (GPL2)""" #Following is if we are importing a file, or setting it up in a cronjob. #This will switch folder to where the .py is located at. import os workfolder=os.path.dirname(__file__) if workfolder: print __file__ os.chdir(workfolder) #Start Logging error import logging logging.basicConfig() #Setup config log = logging.getLogger("MyApp") log.setLevel(logging.DEBUG) #set verbosity to show all messages of severity >= DEBUG #log.setLevel(logging.INFO) #set verbosity to show all messages of severity >= DEBUG log.info("Starting my app") import sqlalchemy #Mysql e = sqlalchemy.create_engine('mysql://user:pass@hostname/db_name') #Postgres #e = sqlalchemy.create_engine('postgres://user:pass@hostname/db_name') #Mssql #e = sqlalchemy.create_engine( "mssql://user:pass@hostname:1433/db_name?driver=TDS&odbc_options='TDS_Version=8.0'" ) #e.echo=True e.echo=False metadata=sqlalchemy.MetaData(e) log.info('Connection Set') from sqlalchemy.orm import sessionmaker Session = sessionmaker(bind=e, autoflush=True, autocommit=False) session = Session() #Connected to a database log.info("Connected to a Database") #----Table Definition and Python object that maps to it------- from sqlalchemy.orm import mapper #Addressbook master File addressbookrecords_table = sqlalchemy.Table('Addressbook', metadata, autoload=True) #Python Object class AddressbookRecords(object): pass #Mapper mapper(AddressbookRecords,addressbookrecords_table) log.info('Connected to a database, and have all the field names') #---------Start of OpenOffice Section------------ """This progrma will do find and replace and save an output to a different file""" #Load necessary items import uno import sys #You need to set few variables when using OpenOffice local = uno.getComponentContext() resolver = local.ServiceManager.createInstanceWithContext("com.sun.star.bridge.UnoUrlResolver", local) #One of the variables is context which connects to openoffice. #We test here if open office is listenting, if not we start it. try: #Am I able to connect? context = resolver.resolve("uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext") log.debug('Connecting to OpenOffice') except Exception ,e : #If I got the error if 'connect to socket' in str(e): import os #Start it os.system('''/usr/bin/openoffice -accept="socket,host=localhost,port=2002;urp;"''') import time #Wait 3 seconds for openoffice to load. time.sleep(3) #See if we can connect again context = resolver.resolve("uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext") log.debug('OpenOffice was not opened. I have openned it and connected') #Rest of the necessary variables. desktop = context.ServiceManager.createInstanceWithContext("com.sun.star.frame.Desktop", context) log.debug('Done Setting up all OpenOffice Variables') #----------Start manipulating openoffice document-------- import string #We will use this function to find and replace things on the page. def findandreplace(document=None,search=None,find=None,replace=None): """This function searches and replaces. Create search, call function findFirst, and finally replace what we found.""" #What to search for search.SearchString = unicode(find) #search.SearchCaseSensitive = True #search.SearchWords = True found = document.findFirst( search ) if found: #log.debug('Found %s' % find) pass while found: found.String = string.replace( found.String, unicode(find),unicode(replace)) found = document.findNext( found.End, search) #This will be our mini file definition. Here we can specify which fileds get replaced. from datetime import datetime def replace_data(record=None): """Key and Value for replacement. Keys are defined by you values come from sqlalchemy. Here we define what $FirstName in odt file will be replaced with.""" data={} data['$FirstName']=record.FIRSTNAME data['$LastName']=record.LASTNAME data['$Address']=record.ADDRESS_1 data['$City']=record.CITY.strip() data['$State']=record.STATE data['$Zipcode']=record.ZIP_CODE data['$TodaysDate']=datetime.now().date() data['$User']=record.USER_NO data['$CaseID']=str(record.CASE_NUMBER) return data #Lets import page break as we will be doing a loop from com.sun.star.style.BreakType import PAGE_BEFORE, PAGE_AFTER #Lets create a new document (document2 and cursor2) that we will use to save our replaced pages to. document2 = desktop.loadComponentFromURL("private:factory/swriter", "_blank", 0, ()) cursor2 = document2.Text.createTextCursor() #Save the document. document2.storeAsURL("file:///home/lucas/ReminderPages.odt",()) log.info('Created New Document: ReminderPages.odt') #Selecting a date for my letters. All records created 35 Days ago. from datetime import timedelta days35=timedelta(days=35) day35=datetime.now().date()-days35 import time day35a=time.strftime('%Y%m%d',day35.timetuple()) #Select the records from the Database. allrecords=session.query(AddressbookRecords).filter( AddressbookRecords.CASE_OPEN_DATE==day35a ).order_by( AddressbookRecords.USER_NO ).all() #Print how many records we have log.info('We have %s Records from the database.' % len(allrecords)) #Lets start the loop log.debug('Starting to Loop Through Records') for record in allrecords: #Load file template document = desktop.loadComponentFromURL("file:///home/lucas/ReminderPagesTemplate.odt" ,"_blank", 0, ()) #We don't need a cursor since we will be searching and replacing only #cursor = document.Text.createTextCursor() #Create Search Descriptor search = document.createSearchDescriptor() #Pass in a sqlalchemy record to our recplace_data which will fill in the values for each field we want. #We will use these values to replace inside the odt. data=replace_data(record) #Do a loop of the data and replace the content.We pass in a dictionary data which has keys and values. #We find keys and replace them with values. for find,replace in data.items(): findandreplace(document,search,unicode(find),unicode(replace)) log.debug(str(find)+','+str(replace)) #Save replaced document as a temporary file. #Trying to find out how to skip this part but for now we do it. document.storeAsURL("file:///home/lucas/temp.odt",()) #Close File document.dispose() #We now append our temp file to our document that will hold all pages. cursor2.gotoEnd(False) cursor2.BreakType = PAGE_BEFORE cursor2.insertDocumentFromURL("file:///home/lucas/temp.odt", ()) document2.store() #2nd copie #cursor2.gotoEnd(False) #cursor2.BreakType = PAGE_BEFORE #cursor2.insertDocumentFromURL("file:///home/lucas/temp.odt", ()) #document2.store() #Exit document2.dispose() log.info('Done.I am exiting')
Process of the script
This code will use the Python SQLAlchemy module and connect with a datasource to replace the fields used in an OpenOffice template.
- The script will first use the logging module to capture any possible error on the code.
- Use sqlalchemy to connect to a datasource. Examples for MySQL, PostgresSQL and MS SQL were commented and connect to them.
- Start OpenOffice on listening mode through a socket.
- Create a function to search and replace within the document
- Create a dictionary with the new information
- Create function to load the document
- Create function to save the document
- Perform the query
- Perform the loop to replace the fields
- Load document template
- Generate the search action
- Do a second loop to search the data with the key values
- Append the temporary file into the final document
Logging module
First we import the necesary modules to define paths on the filesystem.
import os workfolder=os.path.dirname(__file__) if workfolder: print __file__ os.chdir(workfolder)
Then we proceed with the logging of errors and actions
#Start Logging error import logging logging.basicConfig() #Setup config log = logging.getLogger("MyApp") log.setLevel(logging.DEBUG) #set verbosity to show all messages of severity >= DEBUG #log.setLevel(logging.INFO) #set verbosity to show all messages of severity >= DEBUG log.info("Starting my app")
Connect to datasource
We have 3 types of databases that the script is being connected:
- MySQL
- PostgreSQL
- M$ SQL Server
import sqlalchemy #Mysql e = sqlalchemy.create_engine('mysql://user:pass@hostname/db_name') #Postgres #e = sqlalchemy.create_engine('postgres://user:pass@hostname/db_name') #Mssql #e = sqlalchemy.create_engine( # "mssql://user:pass@hostname:1433/db_name?driver=TDS&odbc_options='TDS_Version=8.0'" # )
We define if the conection is true or false depending on the success of the conection.
#e.echo=True e.echo=False metadata=sqlalchemy.MetaData(e) log.info('Connection Set')
We import sessionmaker from sqlalchemy to generate an Object relational model (ORM).
from sqlalchemy.orm import sessionmaker Session = sessionmaker(bind=e, autoflush=True, autocommit=False) session = Session() #Connected to a database log.info("Connected to a Database")
OpenOffice on listening mode
We import UNO module as well as sys for the paths and sockets. We then invoke the ComponentContext and the ServiceManager of the office suite to be able to access the API.
#---------Start of OpenOffice Section------------ """This progrma will do find and replace and save an output to a different file""" #Load necessary items import uno import sys #You need to set few variables when using OpenOffice local = uno.getComponentContext() resolver = local.ServiceManager.createInstanceWithContext("com.sun.star.bridge.UnoUrlResolver", local)
Then we follow with a conditional to start openoffice as a service using the socket, host and port. We add an exception in case this is not succesful.
#One of the variables is context which connects to openoffice. #We test here if open office is listenting, if not we start it. try: #Am I able to connect? context = resolver.resolve("uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext") log.debug('Connecting to OpenOffice') except Exception ,e : #If I got the error if 'connect to socket' in str(e): import os #Start it os.system('''/usr/bin/openoffice -accept="socket,host=localhost,port=2002;urp;"''') import time #Wait 3 seconds for openoffice to load. time.sleep(3) #See if we can connect again context = resolver.resolve("uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext") log.debug('OpenOffice was not opened. I have openned it and connected') #Rest of the necessary variables. desktop = context.ServiceManager.createInstanceWithContext("com.sun.star.frame.Desktop", context) log.debug('Done Setting up all OpenOffice Variables')
Search and replace function
#----------Start manipulating openoffice document-------- import string #We will use this function to find and replace things on the page. def findandreplace(document=None,search=None,find=None,replace=None): """This function searches and replaces. Create search, call function findFirst, and finally replace what we found.""" #What to search for search.SearchString = unicode(find) #search.SearchCaseSensitive = True #search.SearchWords = True found = document.findFirst( search ) if found: #log.debug('Found %s' % find) pass while found: found.String = string.replace( found.String, unicode(find),unicode(replace)) found = document.findNext( found.End, search)
Load and save documents
#Lets import page break as we will be doing a loop from com.sun.star.style.BreakType import PAGE_BEFORE, PAGE_AFTER #Lets create a new document (document2 and cursor2) that we will use to save our replaced pages to. document2 = desktop.loadComponentFromURL("private:factory/swriter", "_blank", 0, ()) cursor2 = document2.Text.createTextCursor() #Save the document. document2.storeAsURL("file:///home/lucas/ReminderPages.odt",()) log.info('Created New Document: ReminderPages.odt')
Save documents
Perform query
#Selecting a date for my letters. All records created 35 Days ago. from datetime import timedelta days35=timedelta(days=35) day35=datetime.now().date()-days35 import time day35a=time.strftime('%Y%m%d',day35.timetuple()) #Select the records from the Database. allrecords=session.query(AddressbookRecords).filter( AddressbookRecords.CASE_OPEN_DATE==day35a ).order_by( AddressbookRecords.USER_NO ).all() #Print how many records we have log.info('We have %s Records from the database.' % len(allrecords))
Loop to replace fields
#Lets start the loop log.debug('Starting to Loop Through Records') for record in allrecords: #Load file template document = desktop.loadComponentFromURL("file:///home/lucas/ReminderPagesTemplate.odt" ,"_blank", 0, ()) #We don't need a cursor since we will be searching and replacing only #cursor = document.Text.createTextCursor() #Create Search Descriptor search = document.createSearchDescriptor() #Pass in a sqlalchemy record to our recplace_data which will fill in the values for each field we want. #We will use these values to replace inside the odt. data=replace_data(record)
Load document from template
Generate search action
Search the data
#Do a loop of the data and replace the content.We pass in a dictionary data which has keys and values. #We find keys and replace them with values. for find,replace in data.items(): findandreplace(document,search,unicode(find),unicode(replace)) log.debug(str(find)+','+str(replace)) #Save replaced document as a temporary file. #Trying to find out how to skip this part but for now we do it. document.storeAsURL("file:///home/lucas/temp.odt",()) #Close File document.dispose()