Menu

Python ETL for linear table copy from PG SQL to PG SQL (P2P).

This one is going to be about simple linear Python ETL tool (module) i wrote during one of my previous work challenges. I had to get the ETL process quick (as quick as i could 🤠) to get the data into my DB and start working on the transformations and reports.

The code

Straight away, here s the code*: github.com/rusloc/PGMOVE
* i m not python coder so that was my solution that worked fine but i bet it lacks optimization, typization and commenting.

The definition

While working on some full-stack BI solution i had to implement a process (ETL) that copies data from source (PG SQL on VM) to the destination (another PG SQL on another VM). Since the microservice backend architecture (tables & structure of tables) was excessive for analytical tasks i had to cut the number of tables and possible the number of columns.

The limitations were:

  • Get it working fast
  • Make the ETL linear (since i was going to make all transformations in my DB)
  • Make ETL automatic
  • Add reporting functionality (i wanted alerts if the scheduled tasks were success or not)
  • Add some logging so in case of errors i could trace the problems and fix them
  • Add a way to configure the settings of the ETL
  • Use only local VMs since i couldnt use Power BI cloud services for ETL

With all these inital settings i went to writing code. Python was the only choice for me.

The solution

To speed up the process of development i decided to go with linear copy of selected tables and later on adding needed transformations in the destination DB.

Setup was like this:

  • Source VM with PG SQL DB that accumulated data from backend microservice architecture
  • Destination VM with PG SQL DB that should recieve selected data from the source
  • Both are in one data center

I went with the solution that comprises the following logical blocks:

  • Python module that does the copying process
  • Some settings file that holds info like:
    • connection secrets
    • table mappings (source to destination)
    • some extra data that supports the process
  • Wrapper (Python script) that calls all needed modules

First i wrote the main module that extracts the data from the source and copies it into the destination.

Then i added one more class that extends the basic logging python module. That was quite new for me and i learned the practical way what inheritance was. And generally the OOP principles were handy in the solution.

Here s the tgLogger class that sends alerts to Telegram:

                            
    class tgLog(logging.Handler):
        
        '''
                *******
                ! DOC !
                *******
                
                Custom handler for logging into TG channel
                
                1. inherit __super__
                2. need to import REQUESTS for session setup
                3. change original EMIT function
                
        '''
        
        def __init__(s, chat, key):
            
            super().__init__()
            
            s._chat = chat
            s._key = key
            s._session = requests.Session()
            
            
        def emit(s, message):
            
            _text = s.format(message)
            s._session.post(f'https://api.telegram.org/bot{s._key}/sendMessage?chat_id={s._chat}&text={_text}')
                            
                        
* You can find it here: github.com/rusloc/PGMOVE/tgLogger and use easily in your project.

It looks very simple yet works fine and does its job with sending messages to the group.

There are a few more aspects that took some time to implement:

  • Setting up up loggers and pasting them here&there in the code to monitor the process
  • Creating SQL query to get the DDL statement of the original table to reproduce the same in the destination
So let me show these two aspects.

Loggers

In the code i initialized three loggers in total: INFO, ERROR, TELEGRAM.

                
    def set_loggers(s):
    '''
        Setup loggers: information logger, error logger & telegram logger
            Additional subclassed TG logger is required with mutated function.
        '''
        
        _handler = logging.FileHandler('_etl_logger.txt', 'a')
        _formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - func_name:  %(funcName)s - %(message)s')
        
        #info logger
        
        info_logger = logging.getLogger('info_logger')
        info_logger.setLevel(logging.INFO)

        _handler.setFormatter(_formatter)
        info_logger.addHandler(_handler)

        # error logger

        error_logger = logging.getLogger('error_logger')
        error_logger.setLevel(logging.ERROR)

        _handler.setFormatter(_formatter)
        error_logger.addHandler(_handler)
        
        # telegram logger
        
        telegram_logger = logging.getLogger('telegram_logger')
        telegram_logger.setLevel(logging.INFO)

        http_Handler = tgLog(
            chat=s._chat,
            key=s._key)

        http_Handler.setFormatter(_formatter)

        telegram_logger.addHandler(http_Handler)

        return info_logger, error_logger, telegram_logger
                
            

Info and Error loggers were logging into the same local file (when error occured or just logging info about the operation).

Telegram logger was sending alert and info into the group (just go and create a group in TG and get its details from your browser address line).

And my takeway from this was very important (and turned out the solution was easy): just initialize logger, add handler and formatter. And they will help you A LOT and save tons of time.

So easy and the value (result) is so great. That was the case where i realized why you need loggers and that actually it s not hard to implement even for the "coder" like me 😜

DDL

The next thing i had to do is to get the source table schema and reproduce the same table in the destination (my own DB) every time the schema changed (in the source of course).

So here is the DDL that perfectly works in PG SQL and grabs the DDL statement:

                
    SELECT                                          
        'CREATE TABLE >' || relname || '< ( ' || array_to_string(
            array_agg(
                '_' || column_name || ' ' ||  type || ' '|| not_null
                    )
                , ','
            ) || ')'
    FROM (
    
        SELECT 
            c.relname
            ,a.attname                                          column_name
            ,pg_catalog.format_type(a.atttypid, a.atttypmod)    type
            ,CASE 
                WHEN a.attnotnull
                    THEN 'NOT NULL' 
                ELSE 'NULL' END                                 not_null 
        FROM pg_class c
            ,pg_attribute a
            ,pg_type t
        WHERE 1=1 
            AND c.relname = '{_table_name_}'
            AND a.attnum > 0
            AND a.attrelid = c.oid
            AND a.atttypid = t.oid
        ORDER BY a.attnum
        ) as tabledefinition
    GROUP BY relname;
                
            

* you can grab this code and use for your own purposes (it really works in PG SQL)

The result

The result was fully automatic ETL tool that was spinning on VM (and scheduled via Cron) with:

  • logging into local file that i could easily monitor and search for problems
  • instant messages in telegram group
  • automatic table recreation process (when the source table changed the destination table was replaced & repopulated accordingly)
  • control over dataset i needed to have in my DB

With this instrument on one side i could spend much-much more time on developing SQL views and producing BI reports.