Using Amazon Redshift Spectrum, Athena and Glue with Node.js – Lessons learned after 60 days in Production

In AdTech Data is King

We, at NUVIAD, have been using Amazon Redshift as our main data warehouse solution for more than 3 years. Over these 3 years we saw our cluster growing from 3 nodes to 65 nodes storing massive amounts of transaction data, which needed to be accessed by our users frequently.

When running Real-Time-Bidding (RTB) campaigns in large scales, data freshness is critical for our customers and partners. Fresh data means that our users can decide faster and respond quicker changes in campaign performance. This was the main reason we allowed our users to query the data stored in Amazon Redshift regardless of the way they run campaigns – whether through our dashboard or using APIs, our goal is to present most up to the minute information.

As the number of customers grew, so did our cluster. And our development team periodically needed to address the growing demand for up-to-date data and performance of the different campaigns.

Trying to balance cost and performance meant that our cluster operated on a very high CPU capacity most of the time with too many frequent peaks of 100% CPU over all the nodes.

Adding additional nodes in production environment is also a delicate task which required data migration and even down time while the resizing process runs.

Things got much simpler with the introduction of Amazon Redshift Spectrum.

In this blog post I will share our experience and insights using Redshift Spectrum, Glue and Athena. I will also provide code samples in node.js pseudo code focusing on the logic and idea behind the process rather on the detailed copy/paste type of code.

What is Amazon Redshift Spectrum?

In a nutshell Redshift Spectrum (or Spectrum, for short) is Amazon Redshift query engine running on data stored on S3. It is a new feature of Amazon Redshift that gives you the ability to run SQL queries using the Redshift query engine, without the limitation of the number of nodes you have in your Amazon Redshift cluster.

Exabyte scale

The ability to provide fresh and up-to-the-minute data to our customers and partners was always a main goal with our platform. When other solutions provided data few hours back, we insisted on providing the most updated data – and that meant for us allowing our customers to query Amazon Redshift directly and analyze their data near real-time.

The benefits were immediately evident, customers could see how their campaigns performed in real-time and react to the ever-changing media supply pricing and availability. However, the impact on the cluster was evident as well. The cluster was pretty much always at 90% CPU utilization.

Furthermore, this approach required the cluster to store data for long periods. In our peak, we maintained a Redshift cluster running 65 dc1.large nodes. Even this configuration was limiting to us.

Migrating our data warehouse to Redshift Spectrum allowed us to overcome these two main limitations. Having S3 as storage engine allows practically limitless data capacity and using the Spectrum distributed compute engine over thousands of nodes provides superior performance and separation between compute and storage which is very important for flexibility, scale and cost. Furthermore, since Spectrum distributes the query across potentially thousands of nodes, queries are not affected by other queries, providing much more stable query performance and higher concurrency compared to the “traditional” Redshift cluster.

Keeping it SQL

Redshift Spectrum uses the same query engine as Redshift – this means that we did not need to change our BI tools or our queries syntax, whether we used complex queries across a single table or run joins across multiple tables. Another interesting addition introduced recently is the ability to create a view that spans Amazon Redshift and Redshift Spectrum external tables.

Seamless scaling

When running a “traditional” Amazon Redshift cluster adding nodes to the cluster may be a process that requires stopping the cluster or taking a snapshot, restoring the snapshot to a new cluster, performing the resize on the new cluster and then moving changing the end points to the new cluster. The former results in a long service disruption while the latter takes more time and costs more money but results in a shorter service disruption.

Scaling Redshift Spectrum is a much simpler process. First, it is practically unlimited in storage since all the data is stored on S3. If you need more compute power you can simply launch a new cluster. All your Spectrum clusters access the same catalog so you don’t need to worry about data migration at all making scaling practically effortless and seamless.

Redshift Spectrum – Athena on Steroids or is it the other way around?

The obvious question is where Spectrum is positioned amongst Redshift and Athena. On one hand Spectrum uses the same query engine as Redshift and on the other hand it uses data stored on S3 – similarly to Athena.

During the migration phase, we had our dataset stored in Redshift, S3 as CSV/GZIP and as Parquet file formats so we performed benchmarks for simple and complex queries on one month’s worth of data.

We tested 5 different configurations:

  1. Redshift cluster with 28 dc1.large nodes
  2. Spectrum using gzip CSV
  3. Spectrum using Parquet
  4. Athena using gzip CSV
  5. Athena using Parquet

We tested both how much time it took to perform the query and how consistent the results were when running the same query few times. The data we used for the tests was already partitioned by date and hour. Partitioning the data significantly affects the performance and improves query times.

Simple Query

First, we tested a simple query aggregating billing data across the month:

select 
  user_id, 
  count(*) as impressions, 
  sum(billing)::decimal /1000000 as billing 
from 
  'table_name' 
where 
  date >= '2017-08-01' and 
  date <= '2017-08-31'  
group by 
  user_id;

We run the same query 7 times and measured the response times (red marking the longest time and green the shortest time):

It is clear to see how much Parquet format is more efficient than CSV. Furthermore, Spectrum outperforms traditional Redshift and shows high consistency in the execution time with small difference between the slowest run and the fastest run.

Comparing the data scanned when using GZIP CSV and Parquet – the differences are also significant:

Since you pay only for the data scanned by every query the cost saving is evident and substantial.

Complex Algo Query

Next, we compared how our Algorithm calculations are performing in these configurations. Given the complexity of the query we could not run it at all on Athena. So we had to compare only Redshift to Spectrum:

In this case, Spectrum using Parquet outperformed Redshift – cutting the run time by about 80% (!!!)

Bottom line: Since Spectrum and Athena are using the same data catalog, we could utilize the speed of Athena for simple queries and enjoy the benefit of running complex queries using Redshift’s query engine on Spectrum.

Optimizing data structure in Amazon S3

On average, we have 800 instances that process our traffic. Each instance needs to send events that are eventually loaded into Redshift. Two of the most common ways to load data to Redshift are:

  1. Use Amazon Kinesis Firehose to offload data directly to Redshift.
  2. Offload the data from each server to S3 and then perform a periodical copy command from S3 to Redshift.

When we initially implemented the data offloading process, AWS Firehose did not support copy options or using your own copy command. As a result, if a single event failed copy to Redshift the entire transaction failed. So, we ended up using method No.2 and specifying a ‘MAXERROR 999’ option in the copy command. We preferred to lose few events out of thousands we had every minute than lose the entire transaction. Although, Firehose updated some time ago with the ability to add copy command options we kept the same collection process that worked flawlessly for almost 3 years.

This, obviously changed when we moved to Spectrum.

With Spectrum, we needed to find a way to do the following:

 

  1. Collect the event data from the instances.
  2. Save the data in Parquet format.
  3. Partition the data effectively.

The best option would be if AWS Firehose could save data as Parquet. Unfortunately, this is not available yet. So, we need to save the data as CSV and then transform it to Parquet.

The most effective methods we managed to generate the Parquet files is running the following steps:

  1. Send the data from the instances to Amazon Kinesis Firehose with S3 temporary bucket as the destination in one minute intervals.
  2. Aggregate hourly data and convert it to Parquet using Lambda and AWS Glue.
  3. Add the Parquet data to Spectrum by updating the table partitions.

With this new process, we had to give more attention to validating the data before we send it to Amazon Kinesis Firehose since a single corrupted record in a partition will fail queries on that partition. In the past, we could “rely” on Redshift copy command to filter errors but now we need to verify the data ourselves.

Data Permutations

One of the key realizations we had while testing various Spectrum configurations, is that since the cost of S3 is relatively cheap and you pay only for the data scanned by each query it may make sense to keep your data in the most effective way for different workloads.

For example, we have a process that runs every minute and generates statistics for the last minute of data collected. In the Redshift era, this would be done by running the query on the table with something as follows:

select 
  user, 
  count(*) 
from 
  events_table 
where 
  ts between ‘2017-08-01 14:00:00’ and ‘2017-08-01 14:00:59’ 
group by 
  user;

(assuming ‘ts’ is your column storing the timestamp for each event.)

With Spectrum, you pay for the data scanned in each query, so if the data is partitioned by, let’s say, date and hour, you pay on average 30 times more than you need since you just need a single minute out of the whole hour.

But, if we use a temporary table that points only the data of the last minute, we save that unnecessary cost. It is important to note, you can have any number of tables pointing to the same data on S3 it all depends on how you partition the data and update the table partitions.

You can even, store the Firehose data in one bucket, process it and move the output data to a different bucket, whichever works for your workload.

Data Validation

Let’s say that we want to store our click data in a table, consider the following SQL create table command:

create external TABLE spectrum.blog_clicks (
    user_id varchar(50),
    campaign_id varchar(50),
    os varchar(50),
    ua varchar(255),
    ts bigint,
    billing float
)
partitioned by (date date, hour smallint)
stored as parquet
location 's3://nuviad-temp/blog/clicks/';

The above statement defines a new external table (all Redshift Spectrum tables are external tables) with few attributes. Please note that we stored ‘ts’ as unix time stamp and not as timestamp and billing is stored as float – not decimal (more on that later on). We also said that the data will be partitioned by date and hour, will be stored as Parquet and the location of the data on S3.

First thing, we need to get the table definitions. This can be achieved by running the following query:

select 
  * 
from 
  svv_external_columns 
where 
  tablename = 'blog_clicks';

This query will list all the columns in the table with their respective definitions:

Now we can use this data to create a validation schema for our data:

const rtb_request_schema = {
    "name": "clicks",
    "items": {
        "user_id": {
            "type": "string",
            "max_length": 100
        },
        "campaign_id": {
            "type": "string",
            "max_length": 50
        },
        "os": {
            "type": "string",
            "max_length": 50
        },
        "ua": {
            "type": "string",
            "max_length": 255
        },
        "ts": {
            "type": "integer",
            "min_value": 0,
            "max_value": 9999999999999
        },

        "billing": {
            "type": "float",
            "min_value": 0,
            "max_value": 9999999999999
        }
    }
};

and create a function which uses this schema to validate data:

function valueIsValid(value, item_schema) {
    if (schema.type == 'string') {
        return (typeof value == 'string' &amp;&amp; value.length &lt;= schema.max_length);
    }
    else if (schema.type == 'integer') {
        return (typeof value == 'number' &amp;&amp; value &gt;= schema.min_value &amp;&amp; value &lt;= schema.max_value);
    }
    else if (schema.type == 'float' || schema.type == 'double') {
        return (typeof value == 'number' &amp;&amp; value &gt;= schema.min_value &amp;&amp; value &lt;= schema.max_value);
    }
    else if (schema.type == 'boolean') {
        return typeof value == 'boolean';
    }
    else if (schema.type == 'timestamp') {
        return (new <strong>Date</strong>(value)).<strong>getTime</strong>() &gt; 0;
    }
    else {
        return true;
    }
}

Amazon Kinesis Firehose

On Amazon Kinesis Firehouse, we created a new Delivery Stream to handle the events as follows:

Delivery stream name: events
Source: Direct PUT
S3 bucket: nuviad-events
S3 prefix: rtb/
IAM role: firehose_delivery_role_1
Data transformation: Disabled
Source record backup: Disabled
S3 buffer size (MB): 100
S3 buffer interval (sec): 60
S3 Compression: GZIP
S3 Encryption: No Encryption
Status: ACTIVE
Error logging: Enabled

 

This Delivery Stream aggregates event every minute or up to 100MB and then writes them to the S3 bucket as a GZIP compressed file.

Next, once we had the data validated, we can safely send it to our Firehose API:

 

if (validated) {
    let itemString = item.join('|')+'\n'; 
       //Sending csv delimited by pipe and adding new line
    let params = {
        DeliveryStreamName: 'events',
        Record: {
            Data: itemString
        }
    };
    firehose.putRecord(params, function(err, data) {
        if (err) {
            console.<strong>error</strong>(err, err.stack);
        }
        else {
            // Continue to your next step
        }
    });
}

Now, we have a single CSV file representing one minute of event data stored in S3. However, the files are named automatically by Firehose. Kinesis Firehose adds a UTC time prefix in the format YYYY/MM/DD/HH before writing objects to Amazon S3. Since we use date and hour as partitions we need to change the file naming and location to fit our Spectrum schema.

Using AWS Lambda to distribute data

We created a simple Lambda function that is triggered by S3 put event and copies the file to a different location (or locations) while renaming it to fit our data structure and processing flow.

As mentioned before, the files generated by Firehose are structured in a pre-defined hierarchy such as:

S3://your-bucket/your-prefix/2017/08/01/20/events-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz

All we need to do is parse the object name and restructure it as we see fit. In our case we did the following (the event is an object received in the lambda function with all the data about the object written to S3):

 

/*
object key structure in the event object:
      your-prefix/2017/08/01/20/event-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz
*/
let key_parts = event.Records[0].s3.object.key.split('/');
let event_type = key_parts[0];
let date = key_parts[1] + '-' + key_parts[2] + '-' + key_parts[3];
let hour = key_parts[4];
if (hour.indexOf('0') == 0) {
      hour = parseInt(hour, 10) + '';
}
let parts1 = key_parts[5].split('-');
let minute = parts1[7];
if (minute.indexOf('0') == 0) {
        minute = parseInt(minute, 10) + '';
}

 

Now, we can redistribute the file to two destinations we need – one for the minute processing task and the other for hourly aggregation:

 

    copyObjectToHourlyFolder(event, date, hour, minute)
        .then(copyObjectToMinuteFolder.bind(null, event, date, hour, minute))
        .then(addPartitionToSpectrum.bind(null, event, date, hour, minute))
        .then(deleteOldMinuteObjects.bind(null, event))
        .then(deleteStreamObject.bind(null, event))
        .then(result => {
            callback(null, { message: 'done' });
        })
        .catch(err => {
            console.error(err);
            callback(null, { message: err });
        });

Some explaining is in order here. Firehose stores the data in a temporary folder. We then copy the object to another folder that holds the data for the last processed minute connected to a small Spectrum table where the data is being processed without needing to scan a much larger data set. Then we copy the data also to folder that holds the data for the entire hour to be later aggregated and converted to Parquet.

Since we partition the data by date and hour, we need to create a new partition on the Spectrum table if the processed minute is the first minute in the hour (i.e. minute 0) by running:

alter table 
   spectrum.events 
add partition
   (date='2017-08-01', hour=0) 
   location 's3://nuviad-temp/events/2017-08-01/0/';

After the data was processed and added to the table, we delete the processed data from the minute storage and temporary Firehose storage.

 

Migrating CSV to Parquet using Glue and EMR

As I described above, we store the data for hourly aggregation and conversion to Parquet. I will not get too much into Parquet (you can read all about it here at https://parquet.apache.org/ or https://en.wikipedia.org/wiki/Apache_Parquet) but enough to say that Parquet is a columnar format that provides superior performance and allows Spectrum (or Athena) to scan significantly less data. As a result, queries run faster and you pay less per query.

The simplest way we found to run an hourly job converting our CSV data to Parquet is using Lambda and Glue (and thanks to the awesome AWS big data team for their help with this).

 

Creating AWS Glue Jobs

 

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value' ])
#day_partition_key = "partition_0"
#hour_partition_key = "partition_1"
#day_partition_value = "2017-08-01"
#hour_partition_value = "0"
day_partition_key = args['day_partition_key']
hour_partition_key = args['hour_partition_key']
day_partition_value = args['day_partition_value']
hour_partition_value = args['hour_partition_value']
print("Running for " + day_partition_value + "/" + hour_partition_value)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = spark.read.option("delimiter","|").csv("s3://nuviad-temp/events/"+day_partition_value+"/"+hour_partition_value)
df.registerTempTable("data")
df1 = spark.sql("select _c0 as user_id, _c1 as campaign_id, _c2 as os, _c3 as ua, cast(_c4 as bigint) as ts, cast(_c5 as double) as billing from data")
df1.repartition(1).write.mode("overwrite").parquet("s3://nuviad-temp/parquet/"+day_partition_value+"/hour="+hour_partition_value)
client = boto3.client('athena', region_name='us-east-1')
response = client.start_query_execution(
    QueryString='alter table parquet_events add if not exists partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ')  location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },
    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)
response = client.start_query_execution(
    QueryString='alter table parquet_events partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') set location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },

    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)
job.commit()

What this simple AWS Glue script is doing is:

  1. Getting parameters for the job, date and hour to be processed
  2. Creating a Spark EMR context allowing us to run Spark code
  3. Reads CSV data into a DataFrame
  4. Writes the data as Parquet to the destination S3 bucket
  5. Adds or modifies the Spectrum / Athena table partition for the table

 

Note: Since Spectrum and Athena use the same AWS Glue Data Catalog we could use the simpler Athena client to add the partition to the table.

 

Few words about float, decimal and double. Using Decimals proved to be more challenging than we expected as it seems that Spectrum and Spark use them differently. Whenever we used Decimal in Spectrum and in Spark we kept getting errors such as:

 

S3 Query Exception (Fetch). Task failed due to an internal error. File ‘https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parquet  has an incompatible Parquet schema for column ‘s3://nuviad-events/events.lat’. Column type: DECIMAL(18, 8), Parquet schema:\noptional float lat [i:4 d:1 r:0]\n (https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parq

 

We had to experiment with few floating-point formats until we found that the only combination that worked was to define the column as double in the Spark code and float in Spectrum. This is the reason you see billing defined as float in Spectrum and double in the Spark code.

 

Creating Lambda function to Trigger Conversion

Next, we need to create a simple Lambda function to trigger the Glue script hourly using a simple Python code:

import boto3
import json
from datetime import datetime, timedelta
client = boto3.client('glue')

def lambda_handler(event, context):
    last_hour_date_time = datetime.now() - timedelta(hours = 1)
    day_partition_value = last_hour_date_time.strftime("%Y-%m-%d")
    hour_partition_value = last_hour_date_time.strftime("%-H")

    response = client.start_job_run(
    JobName='convertEventsParquetHourly',
    Arguments={
         '--day_partition_key': 'date',
         '--hour_partition_key': 'hour',
         '--day_partition_value': day_partition_value,
         '--hour_partition_value': hour_partition_value
         }
    )

Using CloudWatch events we trigger this function hourly. What this function does is triggers a Glue job named ‘convertEventsParquetHourly’ and runs is for the previous hour passing to the Glue Job names and values of the partitions to process.

 

Redshift Spectrum and Node.JS

Our dev stack is based on Node.JS – Node is particularly suited for high speed, light servers that need to process huge amount of transactions. However, few limitations of the Node.JS environment required us to create workarounds and use other tools to complete the process.

 

Node JS and Parquet

Lack of Parquet modules for Node required us to implement Glue/EMR process to effectively migrate data from CSV to Parquet. We would rather save directly to Parquet but we couldn’t find an effective way to do it.

 

One interesting project is in the works developing a parquet NPM by Marc Vertes called node-parquet (https://www.npmjs.com/package/node-parquet) but it is not in production state yet. Well worth following on the progress of this package.

 

Timestamp Data Type

According to Parquet documentation, Timestamp are stored in Parquet as 64 bit integers. However, JavaScript does not support 64 bit integers, because the native number type is a 64-bit double, giving only 53 bits of integer range.

The result is that you cannot store Timestamp correctly in Parquet using Node.js. The solution is to store Timestamp as string and cast the type to timestamp in the query. Using this method, we did not witness any performance degradation whatsoever.

Lessons Learned

Lesson #1: Data Validation is critical

Data validation is critical. As mentioned above, a single corrupt entry in a partition can fail queries running against this partition. It is even more true when using Parquet which is harder to edit than simple CSV file. Make sure you validate your data before scanning it with Redshift Spectrum.

Lesson #2: Structuring and Partitioning Your Data Effectively

One of the biggest benefits of using Redshift Spectrum (or Athena for this matter) is the fact that you don’t need to keep nodes up and running. You pay only for the queries you perform and only for the data scanned per query.

Keeping permutations of your data for different queries makes a lot of sense in this case. For example, you can partition your data by date and hour to run time based queries and have another set partitioned by user_id and date to run user based queries. This will result in faster and more efficient performance of your data warehouse.

Creating Small Tables for Frequent Tasks

Create small tables to commonly used queries. When we started using Spectrum we witnessed our Redshift costs jumping by hundreds of dollars per day realizing that we scanned a full day worth of data every minute.

Take advantage of the ability to define multiple tables on the same bucket / folder and create temporary and small tables for frequent queries.

Storing Data in the right format

Use Parquet whenever you can. The benefits of Parquet are substantial. Faster performance, less data to scan and much more efficient columnar format. However, it is not supported out-of-the-box by Amazon Kinesis Firehose, so you need to implement your own ETL.

Lesson #3: Combining Amazon Athena and Redshift Spectrum for optimal performance

Moving to Redshift Spectrum also allowed us to take advantage of Athena since both use the same data catalog. Run fast and simple queries using Athena while taking advantage of the advanced Redshift query engine for complex queries using Spectrum. Spectrum excels when running complex queries where you can take advantage of Spectrum’s ability to pushes many compute-intensive tasks, such as predicate filtering and aggregation, down to the Amazon Redshift Spectrum layer, so queries use much less of your cluster’s processing capacity.

Lesson #4: Sort your Parquet data within the partition

Another improvement in performance was achieved by sorting data within the partition using .sortWithinPartitions(sort_field)  – for example:

df.repartition(1).sortWithinPartitions(“campaign_id”)…

Rafi Ton

This entry has 0 replies

Comments open

Leave a reply

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>