Create a Date in Airflow’s Execution_Date Format (ISO with Time Zone)

If you are using Apache Airflow and you find a need to make a date in order to compare it to the airflow execution date, here is a simple/clean way of doing it.

Airflow’s dates appear to be in the ISO standard format, with a time zone qualifier.  From their docs here (https://airflow.apache.org/docs/stable/timezone.html):

Support for time zones is enabled by default. Airflow stores datetime information in UTC internally and in the database. It allows you to run your DAGs with time zone dependent schedules. At the moment Airflow does not convert them to the end user’s time zone in the user interface. There it will always be displayed in UTC.

The execution_date will always be in UTC.  So, this piece of code should always work to get the current time in airflow execution_date’s time format:

from datetime import datetime, timezone
datetime.now(timezone.utc).isoformat('T')

Also, you should note that these dates appear in this format:

2020-04-01T00:41:15.926862+00:00

This is great because it means that you can compare them as strings since the numbers are all most-significant-first and 24-hour based.

Connecting to Hive from Python

I was using Hive via Presto for a project, but then I ran into an issue where Presto cannot support Hive views.  So, to be kind to the user, I wanted to present the view definition so they could see how to query the underlying tables.

Unfortunately, you can’t get view definitions from Presto either! So, I had to directly query hive from a Python project.

Two Options

There are two options that I found for achieving this, and surprisingly neither one was great.  You would think this was easy right!?

  1. Use PyHive – This is the standard connector you would have expected to find, except it does not install and/or work on Windows.  So, if you develop on Windows and deploy to Linux, it is painful.  Also, you need some other things on the system for it to work which can be painful to find.
  2. Use JayDeBeApi – This uses the Java JAR to connect to Hive which means it needs Java installed on your machine.  DO NOT USE THIS – I quickly ran into a critical bug that happens on both Windows and Linux – if you open one connection, do work, and close it, you cannot open another connection.  It happens on Windows and Linux.  There is a git story for it and the person had to resort to putting it in another script and calling it as a sub-process for each command which is ridiculous.

So, as I’m deploying on Linux (even though I develop on Windows), PyHive wins.

More on PyHive

So, to install PyHive, you would do the following (but it probably won’t work yet, at least not on Centos7 where I tried it).

pip install pyhive[hive]

Additional Dependencies

In order to get “pyhive[hive]” to install on a server (I tested with Centos7), you have to ensure some other dependencies are available as well.

I was working from Python 3.6 in a virtual environment, and the following worked properly:

sudo yum install gcc-c++ python-devel.x86_64 cyrus-sasl-devel.x86_64
pip install pyhive[hive]

Windows Development

Note that if you do the install without the extra [hive] you will not get all the dependencies.  The reason they’re broken out is this technically supports both Hive and Presto, and that means you get to pick which dependencies you need.

This is a mixed blessing; you can install the package on Windows and develop without the extra [hive] but if you try to execute the code it will fail.  To run it on Linux you need the full set of dependencies.

I recommend guarding the pyhive import and any related code in your project with if os.name != “nt”: in order to ensure you can run through on Windows without getting errors.  Hopefully your project is like mine where this is a side case and I can test plenty without the final calls.

Query Code

The following is a short example of how to do a query from PyHive assuming you have it all set up properly as we talked about above.

conn = None
cursor = None

try:
    query = "describe extended ``.``"
    conn = hive.Connection(host="host-name", port="10000")

    cursor = conn.cursor()
    cursor.execute(query)
    query_results = cursor.fetchall()
    column_names = [part[0] for part in cursor.description]
    df = pd.DataFrame(query_results, columns=column_names)

except Exception as ex:
    logger.info("Error while pulling view details.", ex)
    raise ex

finally:

    if cursor is not None:
        cursor.close()
    if conn is not None:
        conn.close()

Logging in Python 3 (Like Java Log4J/Logback)

What is Proper Logging?

Having a proper logger is essential for any production application.  In the Java world, almost every framework automatically pulls in Logback or Log4J, and libraries tend to use SLF4J in order to be logger agnostic and to wire up to these loggers.  So, I had to set out to see how to do similar logging in python.

While it can get fancier, I think the following things are essential when setting up a logger; so they were what I was looking for:

  1. It should be externally configured from a file that your operations team can change.
  2. It should write to a file automatically, not just console.
  3. It should roll the file it writes to at a regular size (date or time rolling on top of that can be beneficial too; but the size restriction ensures you won’t fill up your disk with a ton of logs and break your applications).
  4. It should keep a history of a few previous rolled files to aid debugging.
  5. It should use a format that specifies both the time of the logs and the class that logged them.

On top of these, obviously we must be able to log at different levels and filter out which logs go to the file easily.  This way, when we have issues, operations can jack up the logging level and figure out what is going wrong as needed.

How Do We Do it in Python 3?

It turns out that Python actually has a strong logging library built into its core distribution.  The only extra library I had to add to use it was PyYAML, and even that could have been avoided (Python supports JSON out of the box and that could be used instead, but people seem to prefer YAML configuration in the community).

In the place where your app starts up, write the following code. Note that you have to install the PyYAML module yourself. Also, this expects the “logging.yaml” to be in the same directory as the startup code (change that if you like though). We’ll show the “logging.yaml” content lower.

import logging
import logging.config
import yaml

# Initialize the logger once as the application starts up.
with open("logging.yaml", 'rt') as f:
config = yaml.safe_load(f.read())
logging.config.dictConfig(config)

# Get an instance of the logger and use it to write a log!
# Note: Do this AFTER the config is loaded above or it won't use the config.
logger = logging.getLogger(__name__)
logger.info("Configured the logger!")

Then, when you want to use the logger in other modules, simply do this:

import logging
logger.info("Using the logger from another module.")

Of course, you just have to import logging once at the top of each module, not every time you write a log.

This code uses “logging.yaml” which contains the following settings. Note that:

  • It defines a formatter with the time, module name, level name, and the logging message.
  • It defines a rotating file handler which writes to my.log and rolls the file at 10MB, keeping 5 historic copies.  The handler is set up to use our simple format from above.
  • The “root” logger writes to our handler and allows only INFO messages through.
  • The handler is set to DEBUG, so if the root logger is increased to DEBUG during an investigation, it will let the messages through to the log file.

Here is the “logging.yaml” example file:

---
version: 1
disable_existing_loggers: False

# Define format of output logs (named 'simple').
formatters:
    simple:
        format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

handlers:

    # Create rotating file handler using 'simple' format.
    file_handler:
        class: logging.handlers.RotatingFileHandler
        level: INFO
        formatter: simple
        filename: operations-audit-query-service.log
        maxBytes: 10485760 # 10MB
        backupCount: 5
        encoding: utf8

root:

    level: INFO
    handlers: [file_handler]

References

The code and YAML for this was adapted from this very good blog which I recommend reading: https://fangpenlin.com/posts/2012/08/26/good-logging-practice-in-python/.

Python Dictionary Comprehension For Multi Level Caching

What’s the Use Case?

I was coding a multi-level cache in Python and came across dictionary comprehensions.  It turns out they are very useful for this! So, it’s a nice example to teach the feature.

Let’s say our cache is like a database schema layout:

  1. database_1
    1. table_1
      1. column_1 – type 1
      2. column_2 – type 2
    2. table_2
      1. column_1 – type 1
      2. column_2 – type 2
  2. database_2
    1. table_1
      1. column_1 – type 1

What’s a Dictionary Comprehension?

A dictionary comprehension basically lets you create a dictionary out of an expression.  So, you can essentially say “for each value in this list create a key for my dictionary where the value is X if some condition is true”.

A couple things to note:

  • You can technically provide any number of input iterables. So, if you want to form a dictionary from multiple sources, it can work; but I”m not going to get into that; google elsewhere!
  • You can provide any number of “if” clauses to prune the results down.  You could achieve this with one, but using one for each condition is neater to write.

A quick example:

>>> some_list = [1,3,8,12]
>>> {key: key * key for key in some_list if key * key % 2 == 0}
{8: 64, 12: 144}

So, here we can see that we took in a list of 4 numbers and made a dictionary out of the number and its square, but only kept the results where the square was even.

The first part “key: key * key” is really just a key : value pair.  The key is on the left and the value (the key * key) could be anything you wanted on the right.  You can call “key” anything you like in the “for key” section.  The “in some_list” is the source collection where our input comes from – again, you can have multiple of these.  Finally, the “if key % 2 == 0” is a filter condition which, again, you can have multiple of.

Why is it Useful For Multi-Level Caching?

In our database example, we must first query the list of databases, then query the list of tables for any database we care about, then query the list of columns for any table we care about.

We don’t want to cache things we don’t need.

So, first off, it would be nice to prime the cache with just the database names and empty table values like so. If the cache is already populated, we just return its top level keys which are the database names:

if cache is None:
    cache = {database: None for database in get_database()}
return list(cache.keys())

Now, what about when the user goes to list the tables in a database?

if cache[database_name] is None:
    cache[database_name] = {table: None for table in get_tables(database_name)}
return list(cache[database_name].keys())

Finally, what about when the user goes to list the columns in a database?

if cache[database_name][table_name] is None:
    cache[database_name][table_name] = get_columns(database_name, table_name)
return cache[database_name][table_name]

So, we can see here that it was trivial to use dictionary comprehensions to turn a list into a dictionary with empty keys as a utility while building the multi level cache out – which is very cool.

This might not have been the best way to build a cache – but it seems pretty simple and efficient to me. Building classes around things is usually a better approach though admittedly :).