While much of the focus in the Julia community has been on the performance aspects of Julia relative to other scientific computing languages, Julia is also perfectly suited to ‘glue’ together multiple data sources/languages. In this blog post, I will cover how to create an interactive plot using Gadfly.jl, by first preparing the data using Hadoop and Teradata Aster via ODBC.jl.
The example problem I am going to solve is calculating and visualizing the number of airplanes by hour in the air at any given time in the U.S. for the year 1987. Because of the structure and storage of the underlying data, I will need to write some custom Hive code, upload the data to Teradata Aster via a command-line utility, re-calculate the number of flights per hour using a built-in Aster function, then using Julia to visualize the data.
Step 1: Getting Data From Hadoop
In a prior set of blog posts, I talked about loading the airline dataset into Hadoop, then analyzing the dataset using Hive or Pig. Using ODBC.jl, we can use Hive via Julia to submit our queries. The hardest part of setting up this process is making sure that you have the appropriate Hive drivers for your Hadoop cluster and credentials (which isn’t covered here). Once you have your DSN set up, running Hive queries is as easy as the following:
usingODBC#Connect to Hadoop cluster via Hive (pre-defined Windows DSN in ODBC Manager)hiveconn=ODBC.connect("Production hiveserver2";usr="your-user-name",pwd="your-password-here")#Clean data, return results directly to file#Data returned with have origin of flight, flight takeoff, flight landing and elapsed timehive_query_string="select
origin,
from_unixtime(flight_takeoff_datetime_origin) as flight_takeoff_datetime_origin,
from_unixtime(flight_takeoff_datetime_origin + (actualelapsedtime * 60)) as flight_landing_datetime_origin,
actualelapsedtime
from
(select
origin,
unix_timestamp(CONCAT(year,\"-\", month, \"-\", dayofmonth, \"\", SUBSTR(LPAD(deptime, 4, 0), 1, 2), \":\", SUBSTR(LPAD(deptime, 4, 0), 3, 4), \":\", \"00\")) as flight_takeoff_datetime_origin,
actualelapsedtime
from vw_airline
where year = 1987 and actualelapsedtime > 0) inner_query;"#Run query, save results directly to filequery(hive_query_string,hiveconn;output="C:\\airline_times.csv",delim=',')
In this code, I’ve written my query as a Julia string, to keep my code easily modifiable. Then, I pass the Julia string object to the query() function, along with my ODBC connection object. This query runs on Hadoop through Hive, then streams the result directly to my local hard drive, making this a very RAM efficient (though I/O inefficient!) operation.
Step 2: Shelling Out To Load Data To Aster
Once I created the file with my Hadoop results in it, I now have a decision point: I can either A) do the rest of the analysis in Julia or B) use a different tool for my calculations. Because this is a toy example, I’m going to use Teradata Aster to do my calculations, which provides a convenient function called burst() to regularize timestamps into fixed intervals. But before I can use Aster to ‘burst’ my data, I first need to upload it to the database.
While I could loop over the data within Julia and insert each record one at a time, Teradata provides a command-line utility to upload data in parallel. Running command-line scripts from within Julia is as easy as using the run() command, with each command surrounded in backticks:
#Connect to Aster (pre-defined Windows DSN in ODBC Manager)asterconn=ODBC.connect("aster01";usr="your-user-name",pwd="your-password")#Create table to hold airline resultscreate_airline_table_statement="create table ebi_temp.airline
(origin varchar,
flight_takeoff_datetime_origin timestamp,
flight_landing_datetime_origin timestamp,
actualelapsedtime int,
partition key (origin))"#Execute queryquery(create_airline_table_statement,asterconn)#Create airport table#Data downloaded from http://openflights.org/data.htmlcreate_airport_table_statement="create table ebi_temp.airport
(airport_id int,
name varchar,
city varchar,
country varchar,
IATAFAA varchar,
ICAO varchar,
latitude float,
longitude float,
altitude int,
timezone float,
dst varchar,
partition key (country))"#Execute queryquery(create_airport_table_statement,asterconn)#Upload data via run() command#ncluster_loader utility already on Windows PATHrun(`ncluster_loader -h 192.168.1.1 -U your-user-name -w your-password -d aster01 -c --skip-rows=1 --el-enabled --el-table e_dist_error_2 --el-schema temp temp.airline C:\\airline_times.csv`)run(`ncluster_loader -h 192.168.1.1 -U your-user-name -w your-password -d aster01 -c --el-enabled --el-table e_dist_error_2 --el-schema temp temp.airport C:\\airports.dat`)
While I could’ve run this at the command-line, having all of this within an IJulia Notebook keeps all my work together, should I need to re-run this in the future.
Step 3: Using Aster For Calculations
With my data now loaded in Aster, I can normalize the timestamps to UTC, then ‘burst’ the data into regular time intervals. Again, all of this can be done via ODBC from within Julia:
#Normalize timestamps from local time to UTC timeaster_view_string="
create view temp.vw_airline_times_utc as
select
row_number() over(order by flight_takeoff_datetime_origin) as unique_flight_number,
origin,
flight_takeoff_datetime_origin,
flight_landing_datetime_origin,
flight_takeoff_datetime_origin - (INTERVAL '1 hour' * timezone) as flight_takeoff_datetime_utc,
flight_landing_datetime_origin - (INTERVAL '1 hour' * timezone) as flight_landing_datetime_utc,
timezone
from temp.airline
left join temp.airport on (airline.origin = airport.iatafaa);"#Execute queryquery(aster_view_string,asterconn)#Teradata Aster SQL-H functionality, accessed via ODBC queryburst_query_string="create table temp.airline_burst_hour distribute by hash (origin) as
SELECT
*,
\"INTERVAL_START\"::date as calendar_date,
extract(HOUR from \"INTERVAL_START\") as hour_utc
FROM BURST(
ON (select
unique_flight_number,
origin,
flight_takeoff_datetime_utc,
flight_landing_datetime_utc
FROM temp.vw_airline_times_utc
)
START_COLUMN('flight_takeoff_datetime_utc')
END_COLUMN('flight_landing_datetime_utc')
BURST_INTERVAL('3600')
);"#Execute queryquery(burst_query_string,asterconn)
Since it might not be clear what I’m doing here, the burst() function in Aster takes a row of data with a start and end timestamp, and (potentially) returns multiple rows which normalize the time between the timestamps. If you’re familiar with pandas in Python, it’s a similar functionality to resample on a series of timestamps.
Step 4: Download Smaller Data Into Julia, Visualize
Now that the data has been processed from Hadoop to Aster through a series of queries, we now have a much smaller dataset that can be loaded into RAM and processed by Julia:
#Calculate the number of flights per hour per dayflights_query="
select
calendar_date,
hour_utc,
sum(1) as num_flights
from temp.airline_burst_hour
group by 1,2
order by 1,2;"#Bring results into Julia DataFrameflights_per_day=query(flights_query,asterconn)usingGadfly#Create boxplot, with one box plot per hourset_default_plot_size(20cm,12cm)p=plot(flights_per_day,x="hour_utc",y="num_flights",Guide.xlabel("Hour UTC"),Guide.ylabel("Flights In Air"),Guide.title("Number of Flights In Air To/From U.S. By Hour - 1987"),Scale.y_continuous(minvalue=0,maxvalue=4000),Geom.boxplot)
The Gadfly code above produces the following plot:
Since this chart is in UTC, it might not be obvious what the interpretation is of the trend. Because the airline dataset represents flights either leaving or returning to the United States, there are many fewer planes in the air overnight and the early morning hours (UTC 7-10, 2-5am Eastern). During the hours when the airports are open, there appears to be a limit of roughly 2500 planes per hour in the sky.
Why Not Do All Of This In Julia?
At this point, you might be tempted to wonder why go through all of this effort? Couldn’t this all be done in Julia?
Yes, you probably could do all of this work in Julia with a sufficiently large amount of RAM. As a proof-of-concept, I hope I’ve shown that there is much more to Julia than micro-benchmarking Julia’s speed relative to other scientific programming languages. You’ll notice that in none of my code have I used any type annotations, as none would really make sense (nor would they improve performance). And although this is a toy example purposely using multiple systems, I much more frequently use Julia in this manner at work than doing linear algebra or machine learning.
So next time you’re tempted to use Python or R or shell scripting or whatever, consider Julia as well. Julia is just as at-home as a scripting language as a scientific computing language.
EDIT, 9/8/2016: Hive has come a long way in the two years since I’ve written this. While some of the code snippets might still work, it’s likely the case that this information is so out-of-date to be nothing more than a reflection of working with Hadoop in 2014.
I’ve been spending a ton of time lately on the data engineering side of ‘data science’, so I’ve been writing a lot of Hive queries. Hive is a great tool for querying large amounts of data, without having to know very much about the underpinnings of Hadoop. Unfortunately, there are a lot of things about Hive (version 0.12 and before) that aren’t quite the same as SQL and have caused me a bunch of frustration; here they are, in no particular order.
1. Set Hive Temp directory To Same As Final Output Directory
When doing a “Create Table As” (CTAS) statement in Hive, Hive allocates temp space for the Map and Reduce portions of the job. If you’re not lucky, the temp space for the job will be somewhere different than where your table actually ends up being saved, resulting in TWO I/O operations instead of just one. This can lead to a painful delay in when your Hive job says it is finished vs. when the table becomes available (one time, I saw a 30 hour delay writing 5TB of data).
If your Hive jobs seem to hang after the Job Tracker says they are complete, try this setting at the beginning of your session:
set hive.optimize.insert.dest.volume=true;
2. Column Aliasing In Group By/Order By
Not sure why this isn’t a default, but if you want to be able to reference your column names by position (i.e. group by 1,2) instead of by name (i.e. group by name, age), then run this at the beginning of your session:
set hive.groupby.orderby.position.alias=true;
3. Be Aware Of Predicate Push-Down Rules
In Hive, you can get great performance gains if you A) partition your table by commonly used columns/business concepts (i.e. Day, State, Market, etc.) and B) you use the partitions in a WHERE clause. These are known as partition-based queries. Otherwise, if you don’t use a partition in your WHERE clause, you will get a full table scan.
Unfortunately, when doing an OUTER JOIN, Hive will sometimes ignore the fact that your WHERE clause is on a partition and do a full table scan anyway. In order to get Hive to push your predicate down and avoid a full table scan, put your predicate on the JOIN instead of the WHERE clause:
If you don’t want to think about the different rules, you can generally put your limiting clauses inside your JOIN clause instead of on your WHERE clause. It should just be a matter of preference (until your query performance indicates it isn’t!)
4. Calculate And Append Percentiles Using CROSS JOIN
Suppose you want to calculate the top 10% of your customers by sales. If you try to do the following, Hive will complain about needing a GROUP BY, because percentile_approx() is a summary function:
1
2
3
4
5
6
--Hive expects that you want to calculate your percentiles by account_number and sales--This code will generate an error about a missing GROUP BY statementselectaccount_number,sales,CASEWHENsales>percentile_approx(sales,.9)THEN1ELSE0ENDastop10pct_sales
To get around the the need for a GROUP BY, we can use a CROSS JOIN. A CROSS JOIN is another name for a Cartesian Join, meaning all of the rows from the first table will be joined to ALL of the rows of the second table. Because the subquery only returns one row, the CROSS JOIN provides the desired affect of joining the percentile values back to the original table while keeping the same number of rows from the original table. Generally, you don’t want to do a CROSS JOIN (because relational data generally is joined on a key), but this is a good use case.
5. Calculating a Histogram
Creating a histogram using Hive should be as simple as calling the histogram_numeric() function. However, the syntax and results of this function are just plain weird. To create a histogram, you can run the following:
However, now that we have a table of data, it’s still not clear how to create a histogram, as the center of variable-width bins is what is returned by Hive. The Hive documentation for histogram_numeric() references Gnuplot, Excel, Mathematica and MATLAB, which I can only assume can deal with plotting the centers? Eventually I’ll figure out how to deal with this using R or Python, but for now, I just use the table as a quick gauge of what the data looks like.
When I set out to build RSiteCatalyst, I had a few major goals: learn R, build CRAN-worthy package and learn the Adobe Analytics API. As I reflect back on how the package has evolved over the past two years and what I’ve learned, I think my greatest learning was around how to deal with JSON (and strings in general).
JSON is ubiquitous as a data-transfer mechanism over the web, and R does a decent job providing the functionality to not only read JSON but also to create JSON. There are at least three methods I know of to build JSON strings, and this post will cover the pros and cons of each method.
Method 1: Building JSON using paste
As a beginning R user, I didn’t have the awareness of how many great user-contributed packages are out there. So throughout the RSiteCatalyst source code you can see gems like:
1
2
3
4
5
6
7
8
#"metrics" would be a user input into a function argumentsmetrics<-c("a","b","c")#Loop over the metrics list, appending proper curly bracesmetrics_conv<-lapply(metrics,function(x)paste('{"id":','"',x,'"','}',sep=""))#Collapse the list into a proper comma separated stringmetrics_final<-paste(metrics_conv,collapse=", ")
The code above loops over a character vector (using lapply instead of a for loop like a good R user!), appending curly braces, then flattening the list down to a string. While this code works, it’s a quite brittle way to build JSON. You end up needing to worry about matching quotation marks, remembering if you need curly braces, brackets or singletons…overall, it’s a maintenance nightmare to build strings this way.
Of course, if you have a really simple JSON string you need to build, paste() doesn’t have to be off-limits, but for a majority of the cases I’ve seen, it’s probably not a good idea.
Method 2: Building JSON using sprintf
Somewhere in the middle of building version 1 of RSiteCatalyst, I started learning Python. For those of you who aren’t familiar, Python has a string interpolation operator%, which allows you to do things like the following:
1
2
3
In[1]:print"Here's a string subtitution for my name: %s"%("Randy")Out[1]:"Here's a string subtitution for my name: Randy"
Thinking that this was the most useful thing I’d ever seen in programming, I naturally searched to see if R had the same functionality. Of course, I quickly learned that all C-based languages have printf/sprintf, and R is no exception. So I started building JSON using sprintf in the following manner:
In this example, we’re now passing R objects into the sprintf() function, with %s tokens everywhere we need to substitute text. This is certainly an improvement over paste(), especially given that Adobe provides example JSON via their API explorer. So I copied the example strings, replaced their examples with my tokens and voilà! Better JSON string building.
Method 3: Building JSON using a package (jsonlite, rjson or RJSONIO)
While sprintf() allowed for much easier JSON, there is still a frequent code smell in RSiteCatalyst, as evidenced by the following:
1
2
3
4
5
6
7
8
9
#Converts report_suites to JSONif(length(report_suites)>1){report_suites<-toJSON(report_suites)}else{report_suites<-toJSON(list(report_suites))}#API requestjson<-postRequest("ReportSuite.GetTrafficVars",paste('{"rsid_list":',report_suites,'}'))
At some point, I realized that using the toJSON() function from rjson would take care of the formatting R objects to strings, yet I didn’t make the leap to understanding that I could build the whole string using R objects translated by toJSON()! So I have more hard-to-maintain code where I’m checking the class/length of objects and formatting them. The efficient way to do this using rjson would be:
With the code above, we’re building JSON in a very R-looking manner; just R objects and functions, and in return getting the output we want. While it’s slightly less obvious what is being created by request.body, there’s literally zero bracket-matching, quoting issues or anything else to worry about in building our JSON. That’s not to say that there isn’t a learning curve to using a JSON package, but I’d rather figure out whether I need a character vector or list than burn my eyes out looking for mismatched quotes and brackets!
Collaborating Makes You A Better Programmer
Like any pursuit, you can get pretty far on your own through hard work and self-study. However, I wouldn’t be nearly where I am without collaborating with others (especially learning about how to build JSON properly in R!). A majority of the RSiteCatalyst code for the upcoming version 1.4 was re-written by Willem Paling, where he added consistency to keyword arguments, switched to jsonlite for better JSON parsing to Data Frames, and most importantly for the topic of this post, cleaned up the method of building all the required JSON strings!
Edit 5/13: For a more thorough example of building complex JSON using jsonlite, check out this example from the v1.4 branch of RSiteCatalyst. The linked example R code populates the required arguments from this JSON outline provide by Adobe.