Calculating Unit Response per shift using Apache Spark

Most fire department schedules are some multiple of 24 hours a shift. A common schedule is a 3-shift schedule in which a person works 24 hours on duty followed by 48 hours off. The shifts are labeled A-shift, B-shift and C-shift.

If a shift starts at 7:00 A.M., as does my department, it ends at 7:00 A.M. the next morning. So for example if August 26, 2017, which is a Friday, is considered A-shift, that shift will continue until Saturday August 27, 2017 at 07:00 and any call between those two dates and times is considered to be on A-shift.

The data and database that my department uses doesn't keep track of shifts. It only tracks date and times. Calculating what day is what shift is fairly easy and not computationally expensive. Read in a row from the database, send the date and time to a function and get back a shift letter.

The expensive part comes when you want to determine calls run by units per shift and calculate, say, the average time it takes for a unit to go responding for that shift. It can be done in a non-distributed way but to then create new queries based on shifts becomes near impossible without saving it to a database table.

This post shows how this all was accomplished using Apache Spark

In [16]:
# Import packages that will be needed

import pandas as pd
from datetime import datetime, timedelta, date

import pyspark.sql.functions as func
from pyspark.sql.functions import udf, col
from pyspark.sql.types import *

Create a function to calculate the start date of a shift and the letter for that shift.

The reason for capturing the shift start date is this. Given our August 26th date given above, which is an A-shift, if a unit has a call on August 27, 2017 at 2:15 A.M. it is considered A-shift. If the start date wasn't calculated now, it probably would need to be calculated later to be able to get all the calls that were run on A-shift which started on August 26th.

Luckily for me the designers of the database got something right and captured call date and times using an epoch timestamp. Epoch timestamps are the total number of seconds that has passed since January 1, 1970 at midnight.

In [17]:
def calc_shift(unix_ts=None):
    """
    Calculates the shift letter and the date for
    the start of a given timestamp.
    
    Returns a tuple (start_date, shift_letter)
    
    Assumptions:
        - Based on a 3 shift rotation
        - A-shift was on Jan 1, 2017
        - Shift starts at 07:00
    """
    
    a_shift = date(2017, 1, 1)
    
    if not unix_ts:
        return None
     
    dt = datetime.utcfromtimestamp( int(unix_ts) )
   
    
    # make input into a date object.  Date objects
    # make it easier to calc number of days between two dates
    d = date(dt.year, dt.month, dt.day)
    delta = (d - a_shift).days
    mod = delta % 3
    
    # if mod equals:
    #    0 --> A shift if hour >= 7, C-shift if before
    #    1 --> B shift if hour >= 7, A-shift if before
    #    2 --> C shift if hour >= 7, B-shift if before
    
    hour = dt.hour
    shift_start_date = d
    
    if mod == 0:
        if hour >= 7:
            shift = 'A'
        else:
            shift = 'C'
            shift_start_date = d - timedelta(days=1)
    elif mod == 1:
        if hour >= 7:
            shift = 'B'
        else:
            shift = 'A'
            shift_start_date = d - timedelta(days=1)
    elif mod == 2:
        if hour >= 7:
            shift = 'C'
        else:
            shift = 'B'
            shift_start_date = d - timedelta(days=1)
            
    shift_start_date_str = shift_start_date.strftime('%Y-%m-%d')
    
    return (shift_start_date_str, shift)
In [18]:
# Check if it works.  Using www.epochconverter.com the following timestamp evaluates to April 26, 2015 8:24:34 A.M.
# Then checking a shift calendar that corresponds to my shifts confirms that based on that date and time
# it is a C-shift

print(calc_shift(1430036674))
('2015-04-26', 'C')
In [19]:
sql_sc = SQLContext(sc)

Read in the data from a CSV table. This data is stored on a network file system (NFS) so the master and worker nodes can have access to the same data.

The data is then cached so it doesn't have to be continuously loaded by Apache Spark

In [20]:
data_df = sql_sc.read.load('/var/sparkdata/cad/fire/July2017-fire-response-times-BCFR.csv',
                            format='com.databricks.spark.csv',
                            header='true',
                            inferSchema='true'
                            )
data_df.cache()
Out[20]:
DataFrame[CALL_CREATED_DATE: timestamp, CALL_CREATED_TIME: int, UNIT_ID: string, CALL_TYPE_FINAL_D: string, UNIT_DISPATCH_DATE: timestamp, UNIT_DISPATCH_TIME: int, UNIT_ENROUTE_DATE: timestamp, UNIT_ENROUTE_TIME: int, response_time: int, CALL_CREATED_INT: int]

As the calc_shift is written above, it is just a plain Python function. In order for Apache Spark to use it and generate column data similar to what is found in a database, Spark needs it as a UDF, or User Defined Function. Below I am defining what the return type Apache Spark can expect from the function.

In [21]:
# calc_shift returns a tuple but Spark can't create multiple columns from a single UDF call
# create a new struct to make a return type for a UDF
# see 
# https://stackoverflow.com/questions/35322764/apache-spark-assign-the-result-of-udf-to-multiple-dataframe-columns
shift_schema = StructType([
    StructField("shift_start_date", StringType(), True),
    StructField("shift_letter", StringType(), True)
])

calc_shift_udf = udf(calc_shift, shift_schema)

Now I create a new table from the data_df that was read in from the file above. Here, using the UDF I defined above, I am telling Spark to create a new column called shift_letter and use the return value from the UDF. The udf is using "call_created_int" as an argument. Each row contains a "call_created_int" which is the epoch timestamp and is stored in the actual database.

In [22]:
shift_df = data_df.withColumn("shift_letter", calc_shift_udf("call_created_int"))

Below is the schema that was defined by adding the new column. Actually two new columns were created.

In [23]:
shift_df.printSchema()
root
 |-- CALL_CREATED_DATE: timestamp (nullable = true)
 |-- CALL_CREATED_TIME: integer (nullable = true)
 |-- UNIT_ID: string (nullable = true)
 |-- CALL_TYPE_FINAL_D: string (nullable = true)
 |-- UNIT_DISPATCH_DATE: timestamp (nullable = true)
 |-- UNIT_DISPATCH_TIME: integer (nullable = true)
 |-- UNIT_ENROUTE_DATE: timestamp (nullable = true)
 |-- UNIT_ENROUTE_TIME: integer (nullable = true)
 |-- response_time: integer (nullable = true)
 |-- CALL_CREATED_INT: integer (nullable = true)
 |-- shift_letter: struct (nullable = true)
 |    |-- shift_start_date: string (nullable = true)
 |    |-- shift_letter: string (nullable = true)

Here I create another new table to make it easier to access the shift_start_date and shift_letter columns.

In [24]:
shift2_df = shift_df.withColumn("shift_start_date", col("shift_letter.shift_start_date")) \
              .withColumn("shift", col("shift_letter.shift_letter")) \
              .drop("shift_letter")
shift2_df.createOrReplaceTempView("calls_with_shift")
In [25]:
shift2_df.select("*").show(5)
+-------------------+-----------------+-------+--------------------+-------------------+------------------+-------------------+-----------------+-------------+----------------+----------------+-----+
|  CALL_CREATED_DATE|CALL_CREATED_TIME|UNIT_ID|   CALL_TYPE_FINAL_D| UNIT_DISPATCH_DATE|UNIT_DISPATCH_TIME|  UNIT_ENROUTE_DATE|UNIT_ENROUTE_TIME|response_time|CALL_CREATED_INT|shift_start_date|shift|
+-------------------+-----------------+-------+--------------------+-------------------+------------------+-------------------+-----------------+-------------+----------------+----------------+-----+
|2017-07-01 00:00:00|            75133|    E81|COMMERICAL FIRE A...|2017-07-01 00:00:00|             75235|2017-07-01 00:00:00|            75422|          107|      1498895493|      2017-07-01|    B|
|2017-07-01 00:00:00|            75133|    S82|COMMERICAL FIRE A...|2017-07-01 00:00:00|             75235|2017-07-01 00:00:00|            75338|           63|      1498895493|      2017-07-01|    B|
|2017-07-01 00:00:00|            75133|  TRK82|COMMERICAL FIRE A...|2017-07-01 00:00:00|             75235|2017-07-01 00:00:00|            75338|           63|      1498895493|      2017-07-01|    B|
|2017-07-01 00:00:00|           133333|    E44|          BRUSH FIRE|2017-07-01 00:00:00|            133549|2017-07-01 00:00:00|           133812|          143|      1498916013|      2017-07-01|    B|
|2017-07-01 00:00:00|           170914|    E48|  Miscellaneous Fire|2017-07-01 00:00:00|            171053|2017-07-01 00:00:00|           171210|           77|      1498928954|      2017-07-01|    B|
+-------------------+-----------------+-------+--------------------+-------------------+------------------+-------------------+-----------------+-------------+----------------+----------------+-----+
only showing top 5 rows

Below I use dataframe notation to calculate the average enroute times for each shift, combining all units. I also calculate the number of calls that were ran on that shift.

(NOTE: When I was first trying to figure this out I mistakenly called it RESPONSE TIME. I am actually calculating EN ROUTE time or as some call it CHUTE time which is the time the unit gets the call to the time they actually start to go to the call.)

In [26]:
# Calculate the average response time for each shift and number of calls for the shift
# https://stackoverflow.com/questions/41890485/aggregate-function-count-usage-with-groupby-in-spark
avg_count_df = shift2_df.select("shift_start_date", "shift", "response_time") \
.sort(col("shift_start_date").asc()).groupby("shift_start_date", "shift") \
.agg(func.round(func.mean("response_time"),2).alias("Avg (secs)"), func.count("shift_start_date") \
.alias("Num Calls")).show(5)
+----------------+-----+----------+---------+
|shift_start_date|shift|Avg (secs)|Num Calls|
+----------------+-----+----------+---------+
|      2017-07-01|    B|     70.75|       16|
|      2017-07-02|    C|    103.57|       21|
|      2017-07-03|    A|     68.71|       14|
|      2017-07-04|    B|      69.6|       10|
|      2017-07-05|    C|     168.0|        5|
+----------------+-----+----------+---------+
only showing top 5 rows

I prefer SQL notation and I found it easier to calculate the average time for each unit for each shift using this notation as opposed to using dataframes to calculate it. In fact I couldn't figure it out using dataframe notation, but again I prefer SQL notation anyway.

In [28]:
query = (
"SELECT shift_start_date, shift, unit_id, COUNT(unit_id), ROUND(AVG(response_time),2) AS avg_resp "
"FROM calls_with_shift "
"GROUP BY unit_id, shift_start_date,shift "
"ORDER BY shift_start_date, unit_id ASC"
)
unit_by_shift_df = sql_sc.sql(query)

Below is the output for the query above. I arbitrarily picked 10 which happens to be all the calls for July 1, 2017. If you add up the count column you will find it adds to 16 which is same as the Num Calls column for the output above.

In [29]:
unit_by_shift_df.select("*").show(10)
+----------------+-----+-------+--------------+--------+
|shift_start_date|shift|unit_id|count(unit_id)|avg_resp|
+----------------+-----+-------+--------------+--------+
|      2017-07-01|    B|    E26|             1|    20.0|
|      2017-07-01|    B|    E29|             1|    86.0|
|      2017-07-01|    B|    E41|             3|   57.67|
|      2017-07-01|    B|    E42|             1|    47.0|
|      2017-07-01|    B|    E43|             2|    80.0|
|      2017-07-01|    B|    E44|             2|   118.5|
|      2017-07-01|    B|    E48|             1|    77.0|
|      2017-07-01|    B|    E81|             1|   107.0|
|      2017-07-01|    B|    S82|             2|    61.5|
|      2017-07-01|    B|  TRK82|             2|    51.0|
+----------------+-----+-------+--------------+--------+
only showing top 10 rows

I convert the data to pandas so that I can save it to a CSV file so I can put in a spreadsheet and give to those who are interested. I tried to create a CSV file directly from the dataframe but got errors so I just switched it to pandas and used it's built in function.

In [30]:
unit_by_shift_df.select("*").toPandas().to_csv('/var/sparkdata/cad/fire/July2017-bcfr-avg-unit-resp-fire.csv')

Summary

In this blog I showed how Apache Spark could be use to help calculate shift dates. Given only a few calls Apache Spark would be over kill. But I used this very same code to generate shift dates, shift letters and average en route times for a file containing 25,222 calls over roughly a 3 year period. In nearly a blink of an eye it was completed, and that included writing the results to a CSV file.

In the process I showed how to create a User Defined Function (UDF), return a tuple containing two values and incorporate them into a dataframe to be used for further analysis.

The hardware I used were 2 HP servers, one containing 2 processors, 12 cores with 128 gigs of RAM. Another HP server with 1 processor, 6 cores, 16 gigs of ram. Both servers are running Centos 7 operating system. The master node is an HP desktop with 4 cores and 8 gigs of ram running Arch Linux. Each system has its own copy of Apache Spark and I've set up a Network File System to share the data.

This analysis came because of multiple requests I have had in the recent pass for this type of data. No more will I have to tell someone it can be determined but I can't do it because I don't have enough computing power and/or time.

In [ ]: