Python ETL for linear table copy from PG SQL to PG SQL (P2P).
This one is going to be about simple linear Python ETL tool (module) i wrote during one of my previous work challenges. I had to get the ETL process quick (as quick as i could 🤠) to get the data into my DB and start working on the transformations and reports.
The code
Straight away, here s the code*:
github.com/rusloc/PGMOVE
* i m not python coder so that was my solution that worked fine but i bet it lacks optimization, typization and commenting.
The definition
While working on some full-stack BI solution i had to implement a process (ETL) that copies data from source (PG SQL on VM) to the destination (another PG SQL on another VM). Since the microservice backend architecture (tables & structure of tables) was excessive for analytical tasks i had to cut the number of tables and possible the number of columns.
The limitations were:
- Get it working fast
- Make the ETL linear (since i was going to make all transformations in my DB)
- Make ETL automatic
- Add reporting functionality (i wanted alerts if the scheduled tasks were success or not)
- Add some logging so in case of errors i could trace the problems and fix them
- Add a way to configure the settings of the ETL
- Use only local VMs since i couldnt use Power BI cloud services for ETL
With all these inital settings i went to writing code. Python was the only choice for me.
The solution
To speed up the process of development i decided to go with linear copy of selected tables and later on adding needed transformations in the destination DB.
Setup was like this:
- Source VM with PG SQL DB that accumulated data from backend microservice architecture
- Destination VM with PG SQL DB that should recieve selected data from the source
- Both are in one data center
I went with the solution that comprises the following logical blocks:
- Python module that does the copying process
- Some settings file that holds info like:
- connection secrets
- table mappings (source to destination)
- some extra data that supports the process
- Wrapper (Python script) that calls all needed modules
First i wrote the main module that extracts the data from the source and copies it into the destination.
Then i added one more class that extends the basic logging python module. That was quite new for me and i learned the practical way what inheritance was. And generally the OOP principles were handy in the solution.
Here s the tgLogger class that sends alerts to Telegram:
class tgLog(logging.Handler):
'''
*******
! DOC !
*******
Custom handler for logging into TG channel
1. inherit __super__
2. need to import REQUESTS for session setup
3. change original EMIT function
'''
def __init__(s, chat, key):
super().__init__()
s._chat = chat
s._key = key
s._session = requests.Session()
def emit(s, message):
_text = s.format(message)
s._session.post(f'https://api.telegram.org/bot{s._key}/sendMessage?chat_id={s._chat}&text={_text}')
* You can find it here: github.com/rusloc/PGMOVE/tgLogger and use easily in your project.
It looks very simple yet works fine and does its job with sending messages to the group.
There are a few more aspects that took some time to implement:
- Setting up up loggers and pasting them here&there in the code to monitor the process
- Creating SQL query to get the DDL statement of the original table to reproduce the same in the destination
Loggers
In the code i initialized three loggers in total: INFO, ERROR, TELEGRAM.
def set_loggers(s):
'''
Setup loggers: information logger, error logger & telegram logger
Additional subclassed TG logger is required with mutated function.
'''
_handler = logging.FileHandler('_etl_logger.txt', 'a')
_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - func_name: %(funcName)s - %(message)s')
#info logger
info_logger = logging.getLogger('info_logger')
info_logger.setLevel(logging.INFO)
_handler.setFormatter(_formatter)
info_logger.addHandler(_handler)
# error logger
error_logger = logging.getLogger('error_logger')
error_logger.setLevel(logging.ERROR)
_handler.setFormatter(_formatter)
error_logger.addHandler(_handler)
# telegram logger
telegram_logger = logging.getLogger('telegram_logger')
telegram_logger.setLevel(logging.INFO)
http_Handler = tgLog(
chat=s._chat,
key=s._key)
http_Handler.setFormatter(_formatter)
telegram_logger.addHandler(http_Handler)
return info_logger, error_logger, telegram_logger
Info and Error loggers were logging into the same local file (when error occured or just logging info about the operation).
Telegram logger was sending alert and info into the group (just go and create a group in TG and get its details from your browser address line).
And my takeway from this was very important (and turned out the solution was easy): just initialize logger, add handler and formatter. And they will help you A LOT and save tons of time.
So easy and the value (result) is so great. That was the case where i realized why you need loggers and that actually it s not hard to implement even for the "coder" like me 😜
DDL
The next thing i had to do is to get the source table schema and reproduce the same table in the destination (my own DB) every time the schema changed (in the source of course).
So here is the DDL that perfectly works in PG SQL and grabs the DDL statement:
SELECT
'CREATE TABLE >' || relname || '< ( ' || array_to_string(
array_agg(
'_' || column_name || ' ' || type || ' '|| not_null
)
, ','
) || ')'
FROM (
SELECT
c.relname
,a.attname column_name
,pg_catalog.format_type(a.atttypid, a.atttypmod) type
,CASE
WHEN a.attnotnull
THEN 'NOT NULL'
ELSE 'NULL' END not_null
FROM pg_class c
,pg_attribute a
,pg_type t
WHERE 1=1
AND c.relname = '{_table_name_}'
AND a.attnum > 0
AND a.attrelid = c.oid
AND a.atttypid = t.oid
ORDER BY a.attnum
) as tabledefinition
GROUP BY relname;
* you can grab this code and use for your own purposes (it really works in PG SQL)
The result
The result was fully automatic ETL tool that was spinning on VM (and scheduled via Cron) with:
- logging into local file that i could easily monitor and search for problems
- instant messages in telegram group
- automatic table recreation process (when the source table changed the destination table was replaced & repopulated accordingly)
- control over dataset i needed to have in my DB
With this instrument on one side i could spend much-much more time on developing SQL views and producing BI reports.