S3 and Athena example

So we have some sample data from AWS Data Exchange:

https://console.aws.amazon.com/dataexchange/home?region=us-east-1#/products/prodview-stk4wn3mbhx24

And we’ve subscribed and downloaded it to our S3 bucket:

s3://rockstaretltesting1/sample_data/shipping/d0e9bd6148e8f14889980954017b0927/83bdd5c3225a70b48acff63e86c74193/ams/2020/202009291500/

So the first thing we have to do is decide which file we’re going to use and copy it to it’s own folder. This is because we can’t create an Athena table pointing to this location as the tables have different schemas (structures ie column names and data types)

So I’m choosing the header table and copying it to it’s own folder with this command:

aws s3 cp s3://rockstaretltesting1/sample_data/shipping/d0e9bd6148e8f14889980954017b0927/83bdd5c3225a70b48acff63e86c74193/ams/2020/202009291500/ams__header_2020__202009291500.csv s3://rockstaretltesting1/sample_data/shipping/d0e9bd6148e8f14889980954017b0927/83bdd5c3225a70b48acff63e86c74193/ams/2020/202009291500/header/

I’m running it in powershell:

So now I have the header data in it’s own subfolder:

Now I can create an external Athena table to point to this subfolder:

So I’m running the following commands in Athena:

create database shipping;
drop table header;
CREATE EXTERNAL TABLE IF NOT EXISTS header (
explore String
)
ROW FORMAT DELIMITED
LOCATION 's3://rockstaretltesting1/sample_data/shipping/d0e9bd6148e8f14889980954017b0927/83bdd5c3225a70b48acff63e86c74193/ams/2020/202009291500/header/';
select * from header limit 10;

I don’t know the column names so I’ve just created the table above so I can at least see the data:

The table now exists in my database:

The data is not split into different columns yet:

So I copy the first row into a new spreadsheet (I’m using Google Sheets):

The spreadsheet is here:

https://docs.google.com/spreadsheets/d/1pA7vodxHJTAUO25W_x18KkI6ME6APZIx6MlB0G5tLcA/edit?usp=sharing

After copying the row into A4, I then split the text to columns in A8:

Thereafter I copy and paste the row transposed into A12 and down:

Thereafter, I name the columns arbitrary F1, F2 and so on and use a formula to concatenate the column type: “String”

I then copy this into Athena to create my table:

CREATE EXTERNAL TABLE IF NOT EXISTS headersplit (
F1 String,
F2 String,
F3 String,
F4 String,
F5 String,
F6 String,
F7 String,
F8 String,
F9 String,
F10 String,
F11 String,
F12 String,
F13 String,
F14 String,
F15 String,
F16 String,
F17 String,
F18 String,
F19 String,
F20 String,
F21 String,
F22 String,
F23 String,
F24 String,
F25 String,
F26 String,
F27 String,
F28 String,
F29 String,
F30 String,
F31 String,
F32 String,
F33 String,
F34 String
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'escapeChar'='\',
'quoteChar'='"',
'separatorChar'=',')
LOCATION 's3://rockstaretltesting1/sample_data/shipping/d0e9bd6148e8f14889980954017b0927/83bdd5c3225a70b48acff63e86c74193/ams/2020/202009291500/header/';

I use the OpenCSVSerde because of the quoted sections as per:

https://docs.aws.amazon.com/athena/latest/ug/csv-serde.html

ie:

ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'escapeChar'='\',
'quoteChar'='"',
'separatorChar'=',')

So now we can see the data split by column:

So let’s do some data profiling:

First let’s see how many rows there are per f3:

select f3, count(1) as numrows
from headersplit
group by f3
order by 2 desc;

Notice 4GB of data scanned ie the whole table:

Also notice there are more than 100 rows so if we wanted to partition the data by this column we could not do this in one query as we’d hit the 100 partition limit per:

https://docs.aws.amazon.com/athena/latest/ug/ctas-insert-into.html

The next thing we want to do is compress the data into parquet format per:

https://docs.aws.amazon.com/athena/latest/ug/ctas-insert-into.html

Let’s see if we can compress the table to get some scanning efficiencies per:

https://docs.aws.amazon.com/athena/latest/ug/convert-to-columnar.html

https://docs.aws.amazon.com/athena/latest/ug/ctas-insert-into-etl.html

Let’s use this format:

per https://docs.aws.amazon.com/athena/latest/ug/ctas-examples.html

Now if we run the same query as before (but on the new table)

Wow! That is a huge saving of data scanned! We went from 4GB scanned down to 7MB!

Also the query time went down from 3 seconds down to 1.31 seconds!

This is clearly an important strategy to convert to columnar storage formats!

It’s also interesting to see where the new table is located in S3 and it’s size:

Athena has split data into separate smaller files:

Now let’s partition the table by the f3 column:

If we try run the following query:

Notice we’re specifying the table location in S3 this time:

CREATE TABLE headersplitcompressedpartitioned
WITH (
format = 'Parquet',
parquet_compression = 'SNAPPY',
partitioned_by = ARRAY['f3'],
external_location = 's3://rockstaretltesting1/sample_data/shipping/d0e9bd6148e8f14889980954017b0927/83bdd5c3225a70b48acff63e86c74193/ams/2020/202009291500/header/compressed/' )
AS SELECT F1,
F2,
F4,
F5,
F6,
F7,
F8,
F9,
F10,
F11,
F12,
F13,
F14,
F15,
F16,
F17,
F18,
F19,
F20,
F21,
F22,
F23,
F24,
F25,
F26,
F27,
F28,
F29,
F30,
F31,
F32,
F33,
F34,
F3
FROM headersplitcompressed;

We get the following error because there are two many partitions: ie more than 100:

So either we can do the manual approach by filtering by the f3 column in the where clause like:

where f3 between 'AD' and 'FO';

However this a manual approach.

Let’s try an automated approach using rockstarETL’s batch partition insert job:

First we need to create the partitioned table before we run the insert into statements:

Let’s get the partition column values:

So we’ll create the partitioned table by filtering on the “AD” f3 column value:

Notice that f3 has to be the last column in our select statement because it is now a partitioning column:

CREATE TABLE headersplitcompressedpartitioned
WITH (
format = 'Parquet',
parquet_compression = 'SNAPPY',
partitioned_by = ARRAY['f3'],
external_location = 's3://rockstaretltesting1/sample_data/shipping/d0e9bd6148e8f14889980954017b0927/83bdd5c3225a70b48acff63e86c74193/ams/2020/202009291500/header/compressed/' )
AS SELECT F1,
F2,
F4,
F5,
F6,
F7,
F8,
F9,
F10,
F11,
F12,
F13,
F14,
F15,
F16,
F17,
F18,
F19,
F20,
F21,
F22,
F23,
F24,
F25,
F26,
F27,
F28,
F29,
F30,
F31,
F32,
F33,
F34,
F3
FROM headersplitcompressed
where f3 = 'AD';

If we look in S3 we see the subfolder partitioned prefix folder:

And the table now exists in Athena:

So let’s construct our insert into statement:

insert into headersplitcompressedpartitioned
(F1,
F2,
F4,
F5,
F6,
F7,
F8,
F9,
F10,
F11,
F12,
F13,
F14,
F15,
F16,
F17,
F18,
F19,
F20,
F21,
F22,
F23,
F24,
F25,
F26,
F27,
F28,
F29,
F30,
F31,
F32,
F33,
F34,
F3) 
select
F1,
F2,
F4,
F5,
F6,
F7,
F8,
F9,
F10,
F11,
F12,
F13,
F14,
F15,
F16,
F17,
F18,
F19,
F20,
F21,
F22,
F23,
F24,
F25,
F26,
F27,
F28,
F29,
F30,
F31,
F32,
F33,
F34,
F3
from headersplitcompressed
where f3 = 'AE';

Let’s run the query to test and confirm it will work:

So now we have two partitions already loaded (check in s3)

(Remember to refresh, it won’t refresh on it own)

We can delete this data before we run our job in rockstarETL because we don’t want the data duplicated:

So now we can copy our insert query to S3:

This is achieved by copying the query into a text file and uploading to S3:

insert into headersplitcompressedpartitioned
(F1,
F2,
F4,
F5,
F6,
F7,
F8,
F9,
F10,
F11,
F12,
F13,
F14,
F15,
F16,
F17,
F18,
F19,
F20,
F21,
F22,
F23,
F24,
F25,
F26,
F27,
F28,
F29,
F30,
F31,
F32,
F33,
F34,
F3)
select
F1,
F2,
F4,
F5,
F6,
F7,
F8,
F9,
F10,
F11,
F12,
F13,
F14,
F15,
F16,
F17,
F18,
F19,
F20,
F21,
F22,
F23,
F24,
F25,
F26,
F27,
F28,
F29,
F30,
F31,
F32,
F33,
F34,
F3
from headersplitcompressed

This is how this single file would appear in the rockstarETL batch partition insert job type:

This is how the rest of the columns would look:

The paritioning column: “f3” is saved as f3 in a file and uploaded to S3 ie:

Our saved job in rockstarETL:

Now let’s set the job to run:

By specifying the RunTime (in UTC) and the Number of Runs per day:

The pipeline now shows:

So if we inspect the logs:

We have an error: “The specified bucket is not valid”

The pipeline fails after checking a few times:

We correct the pipeline by removing the “S3://” prefix:

We try the pipeline again by re-setting the time and get a different error:

The pipeline checks a few times and fails again:

This error means our insert statement was poorly constructed:

Instead of

We need to change it to:

So we change the file and re-upload it to S3:

Let’s try run our pipeline again:

So now, the previous error has gone away and it’s correctly detecting our partitioning column name:

But we still get an error:

If we inspect the log a little higher up:

We can re-run the query used to calculate the partitioning column values:

select row_number() over(order by f3) as rn, f3
from (
SELECT DISTINCT f3
FROM shipping.headersplitcompressed
ORDER BY 1);

And we notice a symptom of our dirty data:

Let’s see if we can clean up the dirty data and re-run our pipeline:

I’m deleting and recreating our compressed table to exclude the empty or null partitioning value:

drop table headersplitcompressed;
CREATE TABLE headersplitcompressed
WITH (
format = 'Parquet',
parquet_compression = 'SNAPPY')
AS SELECT *
FROM headersplit
where f3 != '' and f3 is not null;

Still the error persists:

Which is a strange one because the column is definitely in our destination table:

SELECT *
FROM information_schema.columns
WHERE table_schema = 'shipping'
AND table_name = 'headersplitcompressedpartitioned';

But a closer inspection of the logs reveals something strange about the destination table:

A closer inspection of our insert into query reveals there was no space after the table name. This is important.

So after correcting this (adding the space) and re-uploading to S3, let’s re-run the pipeline and see:

Now before we proceed, let’s confirm that the compressed table agrees with the source table with some profiling:

(Notice again the data scanned and time the queries take to complete)

First let’s check the original uncompressed table:

Now let’s check the row count on the compressed table:

Notice there is a large difference.

Also if we just select the top 10 rows see our result:

We’re getting gobbledoo-gook because we are actually querying compressed data! This is because our compressed table is stored in a subfolder in the same location!

The compressed scanning is even more obvious:

So we need to be more careful about where we create our compressed partitioned table!

Our headersplit table is located at:

show create table headersplit;

LOCATION
  's3://rockstaretltesting1/sample_data/shipping/d0e9bd6148e8f14889980954017b0927/83bdd5c3225a70b48acff63e86c74193/ams/2020/202009291500/header'

while the compressed partitioned table headersplitcompressed is located at:

show create table headersplitcompressedpartitioned;

 's3://rockstaretltesting1/sample_data/shipping/d0e9bd6148e8f14889980954017b0927/83bdd5c3225a70b48acff63e86c74193/ams/2020/202009291500/header/compressed/'

So we need to fix this by clearing out the s3 using a S3 delete objects job type in rockstarETL:

To watch the process live we SSH into the server and at the terminal use the following command:

sudo tail -f /var/log/tomcat/catalina.out

Now our S3 location is clear and no longer has the compressed subfolder:

We then disable the job:

and then re-run the Athena batch partition insert job:

But first we must change the location of the table in Athena:

First we drop the table:

drop table headersplitcompressedpartitioned;

Then we re-create it:

CREATE TABLE headersplitcompressedpartitioned
WITH (
format = 'Parquet',
parquet_compression = 'SNAPPY',
partitioned_by = ARRAY['f3'],
external_location = 's3://rockstaretltesting1/sample_data/shipping/d0e9bd6148e8f14889980954017b0927/83bdd5c3225a70b48acff63e86c74193/ams/2020/202009291500/header_compressed_partitioned/' )
AS SELECT F1,
F2,
F4,
F5,
F6,
F7,
F8,
F9,
F10,
F11,
F12,
F13,
F14,
F15,
F16,
F17,
F18,
F19,
F20,
F21,
F22,
F23,
F24,
F25,
F26,
F27,
F28,
F29,
F30,
F31,
F32,
F33,
F34,
F3
FROM headersplitcompressed
where f3 = 'AD';

And the table now exists again in our database:

Now our table data is in it’s own subfolder and won’t interfere with our original table when we profile it: ‘s3://rockstaretltesting1/sample_data/shipping/d0e9bd6148e8f14889980954017b0927/83bdd5c3225a70b48acff63e86c74193/ams/2020/202009291500/header_compressed_partitioned/’

show create table headersplitcompressedpartitioned;

Now our tables are a lot closer when profiled:

What accounts for the difference?

Recall we had a null f3 type in the original table:

The difference is exactly that:

Now that our batch partition insert job has run we again can profile our two tables to confirm they agree.

First out compressed but not partitioned table:

Then our partitioned table:

They both agree!

Let’s perform a slightly more detailed profiling:

All looks good!