Extract Airflow’s execution_date to a String From a Task

If you’re in an airflow task, you can get the execution date, as a string, in the same format the API’s/etc use, like this:

def save_exec_date(filename, **context):

    with open(filename, 'w') as fp:
        fp.write(context["execution_date"].to_iso8601_string(extended=True))


save_exec_date = PythonOperator(
    task_id='save_exec_date_to_file',
    python_callable=save_exec_date,
    op_kwargs={
        'filename': exec_date.txt,
    },
    provide_context=True,
    dag=dag)

Create a Date in Airflow’s Execution_Date Format (ISO with Time Zone)

If you are using Apache Airflow and you find a need to make a date in order to compare it to the airflow execution date, here is a simple/clean way of doing it.

Airflow’s dates appear to be in the ISO standard format, with a time zone qualifier.  From their docs here (https://airflow.apache.org/docs/stable/timezone.html):

Support for time zones is enabled by default. Airflow stores datetime information in UTC internally and in the database. It allows you to run your DAGs with time zone dependent schedules. At the moment Airflow does not convert them to the end user’s time zone in the user interface. There it will always be displayed in UTC.

The execution_date will always be in UTC.  So, this piece of code should always work to get the current time in airflow execution_date’s time format:

from datetime import datetime, timezone
datetime.now(timezone.utc).isoformat('T')

Also, you should note that these dates appear in this format:

2020-04-01T00:41:15.926862+00:00

This is great because it means that you can compare them as strings since the numbers are all most-significant-first and 24-hour based.

Validate/Check Parquet File Schema From PC/Laptop

Checking a Parquet Schema

Generally, when I have had to check the schema of a parquet file in the past, I have checked it within Apache Spark, or by using https://github.com/apache/parquet-mr/tree/master/parquet-tools.

Today, I had to check a parquet file schema and I came across this nifty python utility though: https://github.com/chhantyal/parquet-cli.  I think it’s just a wrapper around pyarrow, but it is slick and works easily.

You can pip install it trivially and then use it to view the data and schema of a parquet file with ease.  Here’s an example of installing it and checking a schema:

$ pip install parquet-cli
...

$ parq part-00000-679c332c.c000.snappy.parquet --schema

# Schema
ID: BYTE_ARRAY String
OrderID: BYTE_ARRAY String
SaleID: BYTE_ARRAY String
OrderDate: BYTE_ARRAY String
Pack: BYTE_ARRAY String
Qnty: BYTE_ARRAY String
Ratio: BYTE_ARRAY String
Name: BYTE_ARRAY String
Org: BYTE_ARRAY String
Category: BYTE_ARRAY String
Type: BYTE_ARRAY String
Percentage: BYTE_ARRAY String

Needless to say, this is much easier than dealing with Spark or parquet tools for schema validation or checks of not-too-huge parquet file data.

Connecting to Hive from Python

I was using Hive via Presto for a project, but then I ran into an issue where Presto cannot support Hive views.  So, to be kind to the user, I wanted to present the view definition so they could see how to query the underlying tables.

Unfortunately, you can’t get view definitions from Presto either! So, I had to directly query hive from a Python project.

Two Options

There are two options that I found for achieving this, and surprisingly neither one was great.  You would think this was easy right!?

  1. Use PyHive – This is the standard connector you would have expected to find, except it does not install and/or work on Windows.  So, if you develop on Windows and deploy to Linux, it is painful.  Also, you need some other things on the system for it to work which can be painful to find.
  2. Use JayDeBeApi – This uses the Java JAR to connect to Hive which means it needs Java installed on your machine.  DO NOT USE THIS – I quickly ran into a critical bug that happens on both Windows and Linux – if you open one connection, do work, and close it, you cannot open another connection.  It happens on Windows and Linux.  There is a git story for it and the person had to resort to putting it in another script and calling it as a sub-process for each command which is ridiculous.

So, as I’m deploying on Linux (even though I develop on Windows), PyHive wins.

More on PyHive

So, to install PyHive, you would do the following (but it probably won’t work yet, at least not on Centos7 where I tried it).

pip install pyhive[hive]

Additional Dependencies

In order to get “pyhive[hive]” to install on a server (I tested with Centos7), you have to ensure some other dependencies are available as well.

I was working from Python 3.6 in a virtual environment, and the following worked properly:

sudo yum install gcc-c++ python-devel.x86_64 cyrus-sasl-devel.x86_64
pip install pyhive[hive]

Windows Development

Note that if you do the install without the extra [hive] you will not get all the dependencies.  The reason they’re broken out is this technically supports both Hive and Presto, and that means you get to pick which dependencies you need.

This is a mixed blessing; you can install the package on Windows and develop without the extra [hive] but if you try to execute the code it will fail.  To run it on Linux you need the full set of dependencies.

I recommend guarding the pyhive import and any related code in your project with if os.name != “nt”: in order to ensure you can run through on Windows without getting errors.  Hopefully your project is like mine where this is a side case and I can test plenty without the final calls.

Query Code

The following is a short example of how to do a query from PyHive assuming you have it all set up properly as we talked about above.

conn = None
cursor = None

try:
    query = "describe extended ``.``"
    conn = hive.Connection(host="host-name", port="10000")

    cursor = conn.cursor()
    cursor.execute(query)
    query_results = cursor.fetchall()
    column_names = [part[0] for part in cursor.description]
    df = pd.DataFrame(query_results, columns=column_names)

except Exception as ex:
    logger.info("Error while pulling view details.", ex)
    raise ex

finally:

    if cursor is not None:
        cursor.close()
    if conn is not None:
        conn.close()