Airflow Task and Dag State/Status Enumerations

I’ve been working to make a kind of “remote dag watcher dag” lately.  Basically, I want to render all the tasks in a dag from another airflow on my airflow.  This way, I can watch a set of tasks across multiple airflows (some of which I may not control).

As part of this, I am having to use the (fairly bad) APIs a we use both the experimental and the plugin one here (

Anyway, the APIs don’t document task and DAG state, so I frequently have been looking them up in code.  Here’s a reference for ease:

# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import unicode_literals

from builtins import object

class State(object):
    Static class with task instance states constants and color method to
    avoid hardcoding.

    # scheduler
    NONE = None
    REMOVED = "removed"
    SCHEDULED = "scheduled"

    # set by the executor (t.b.d.)
    # LAUNCHED = "launched"

    # set by a task
    QUEUED = "queued"
    RUNNING = "running"
    SUCCESS = "success"
    SHUTDOWN = "shutdown"  # External request to shut down
    FAILED = "failed"
    UP_FOR_RETRY = "up_for_retry"
    UP_FOR_RESCHEDULE = "up_for_reschedule"
    UPSTREAM_FAILED = "upstream_failed"
    SKIPPED = "skipped"

    task_states = (

    dag_states = (

    state_color = {
        QUEUED: 'gray',
        RUNNING: 'lime',
        SUCCESS: 'green',
        SHUTDOWN: 'blue',
        FAILED: 'red',
        UP_FOR_RETRY: 'gold',
        UP_FOR_RESCHEDULE: 'turquoise',
        UPSTREAM_FAILED: 'orange',
        SKIPPED: 'pink',
        REMOVED: 'lightgrey',
        SCHEDULED: 'tan',
        NONE: 'lightblue',

    def color(cls, state):
        return cls.state_color.get(state, 'white')

    def color_fg(cls, state):
        color = cls.color(state)
        if color in ['green', 'red']:
            return 'white'
        return 'black'

    def finished(cls):
        A list of states indicating that a task started and completed a
        run attempt. Note that the attempt could have resulted in failure or
        have been interrupted; in any case, it is no longer running.
        return [

    def unfinished(cls):
        A list of states indicating that a task either has not completed
        a run or has not even started.
        return [

Extract Airflow’s execution_date to a String From a Task

If you’re in an airflow task, you can get the execution date, as a string, in the same format the API’s/etc use, like this:

def save_exec_date(filename, **context):

    with open(filename, 'w') as fp:

save_exec_date = PythonOperator(
        'filename': exec_date.txt,

Create a Date in Airflow’s Execution_Date Format (ISO with Time Zone)

If you are using Apache Airflow and you find a need to make a date in order to compare it to the airflow execution date, here is a simple/clean way of doing it.

Airflow’s dates appear to be in the ISO standard format, with a time zone qualifier.  From their docs here (

Support for time zones is enabled by default. Airflow stores datetime information in UTC internally and in the database. It allows you to run your DAGs with time zone dependent schedules. At the moment Airflow does not convert them to the end user’s time zone in the user interface. There it will always be displayed in UTC.

The execution_date will always be in UTC.  So, this piece of code should always work to get the current time in airflow execution_date’s time format:

from datetime import datetime, timezone'T')

Also, you should note that these dates appear in this format:


This is great because it means that you can compare them as strings since the numbers are all most-significant-first and 24-hour based.

Is Presto Slow When Returning Millions of Records / Big Result Set?

What Was Our Problem?

We were having issues with people reporting that Presto was slow when they were exporting hundreds of millions of records from much larger tables.  The queries were simple where clause filters selecting a few fields from some hundred-billion record tables.

Was It Actually Slow?

No! At least, not when parallelized well and tested properly.  I wrote a Java app to

  • Split a query into N =100 parts by using where clauses with a modulus on an integer column.
  • Query presto in parallel with 30 threads, going through the 100 queries.
  • Output results to standard out.

In an external bash script, I also grepped the results just to show some statistics I outputted.

This was slow!

Why Was it Slow?

First of all, let’s talk about the presto cluster setup:

  • 1 coordinator.
  • 15 workers.
  • All m5.8xlarge = 128GB RAM / 32 processor cores.

This is pretty decent.  So, what could our bottlenecks  be?

  1. Reading from s3.
  2. Processing results in workers.
  3. Slow coordinator due to having 15 workers talk through it.
  4. Slow consumer (our client running the queries).

To rule out 1/2/3 respectively I:

  1. Did a count(*) query which would force a scan over all relevant s3 data.  It came back pretty fast (in 15 seconds or so).
  2. Added more workers.  Having more workers  had minimal effect on the final timings, so we’re not worker bound.
  3. Switched the coordinator to a very large, compute-optimized node type.  This had minimal effect on the timings as well.

So, the problem appears to be with the client!

Why Was the Client Slow?

Our client really wasn’t doing a lot.  It was running 30 parallel queries and outputting the results, which were being grepped.  It was a similarly sized node to our presto coordinator, and it had plenty of CPU, RAM, decent network and disks (EBS).

It turned out though that once we stopped doing the grep and once we stopped writing the results to stdout, and we just held counters/statistics on the results we read, it went from ~25 minutes to ~2 minutes.

If we had run this in Spark or some other engine with good parallel behavior, we would have seen the workload distribute better over more nodes with sufficient ability to parallel process their portions of the records.  But, since we were running on a single node, with all results, the threads/CPU and other resoruces we were using capped out and could not go any faster.

Note: we did not see the client server as having high utilization, but some threads were at 100%.  So, the client app likely had a bottleneck we could avoid if we improved it.


So… next time you think presto can’t handle returning large numbers of results from the coordinator, take some time to evaluate your testing methodology.  Presto isn’t designed to route hundreds of millions of results, but it does it quite well in our experience.


TMUX Easy Hotkeys + Lean Command Prompt

This is just a simple note of some TMUX and BASH profile configuration that I find helpful when developing.

Tmux lets you split screens arbitrarily.  So, you can divide one terminal up into, say, 1 half-screen, full width panel up top and 2, quarter-screen panels on the bottom.  Generally, you have to hop around by pushing ctrl + b and then a key (like % or “) to split a screen into multiple panels horizontally or vertically.  Equally, you do ctrl +b and an arrow key to move between panes.

Two problems with this are:

  • ctrl+b is an awful hot key, it’s hard to reach and you’ll use it constantly.
  • Once you end up in quarter-size screens, even on big monitors, your default terminal prompt (the thing that shows up before you type commands) becomes too large for comfortable use.

TMUX Config

Edit (or create) the file ~/.tmux.conf to customize tmux.  Then add these lines.  The first two switch the ctrl + b hotkey to ctrl + a (which is much more convenient for constant use).  The others are just nice fixes to have around for other reasons.

# remap prefix from 'C-b' to 'C-a'
unbind C-b
set-option -g prefix C-a

#Fix some annoying behavior that makes some keys not work.
bind-key C-a send-prefix

# Start window numbering at 1
set -g base-index 1

Bash Profile Prompt Config

Edit your ~/.bashrc file (your bash profile).  If a line for PS1= already exists, you’ll want to comment it out or modify it, otherwise just add this in:

PS1="\u:\W$ "

This changes your terminal prompt to only show .  So, for example, if my name is John and I’m working in a folder called /opt/code, it would just display john:code vs a full path name with additional info (which takes up way too much space on quarter-size panes).

I hope you find this useful, and thanks for reading!