Wednesday, June 1, 2016

BigInsights 4 - Running later versions of Spark

With BigInsights you get what you get in terms of the versions of components that are shipped in the box. I'm on BigInisighs 4.1 currently which ships with Spark 1.4.1 and I needed to quickly test something on the cluster with Spark 1.6.1. What I did is not a supported configuration but it got me going quickly and relatively simply. YMMV

Firstly I downloaded the Spark binaries from Apache that have the right versions of Hadoop etc. I used the following link

http://spark.apache.org/downloads.html

I selected

"1.6.1" as the Spark release
"Pre-built for Hadoop 2.6 or later" as the package type

This gave me spark-1.6.1-bin-hadoop2.6.tgz. I checked the signatures and unpacked it in my home directory on the cluster. My home directory happens to be mounted on all of the nodes in the cluster.

I then wrote a script call spark-submit.sh to use in place of the spark-submit command that ships with Spark 1.4.1 from BigInsights 4.1. It has the following contents:

export SPARK_HOME=/home/me/spark/spark-1.6.1-bin-hadoop2.6
export PATH=$SPARK_HOME/bin:$PATH

# Options read in YARN client mode
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
export HADOOP_CONF_DIR=/etc/hadoop/conf
# - SPARK_EXECUTOR_INSTANCES, Number of executors to start (Default: 2)
export SPARK_EXECUTOR_INSTANCES=1

$SPARK_HOME/bin/spark-submit --master yarn-client "$@"


Note that here we are just setting the environment to point to the 1.6.1 install and, for testing purposes, just using one executor. It then calls through to the 1.6.1 spark-submit program which will in turn use the BigInsights Yarn install to fire up containers for the Spark job you are submitting.

Note that the spark distribution directory contains a conf directory. In my case

/home/me/spark/spark-1.6.1-bin-hadoop2.6/conf

Two files need to be created there. When I first tried to run some jobs I was getting the error



Error: Could not find or load main class org.apache.spark.deploy.yarn.ApplicationMaster
 

To solve this I did the following

cd /home/me/spark/spark-1.6.1-bin-hadoop2.6/conf


cp spark-defaults.conf.template spark-defaults.conf

vi spark-defaults.conf

add

spark.executor.extraJavaOptions  -Diop.version=4.1.0.0

spark.driver.extraJavaOptions -Diop.version=4.1.0.0

spark.yarn.am.extraJavaOptions -Diop.version=4.1.0.0

Then

touch java-opts

add

-Diop.version=4.1.0.0


After this my example program ran successfully.

I can now choose which version of Spark I want to run with and I have changed the Ambari based configuration of BigInsights. This is not ideal if you want a consistent cluster but OK for me running a few quick tests.

Monday, February 22, 2016

Formatting JSON Files in Eclipse

This is just a short post so that next time I don't forget the following.

I've just moved to the Eclipse Mars release and one of the first things I did with the new install was to open a JSON file and find that I didn't have the option to format the JSON from being stretched out on a single line to being prett-printed so I could read it.

I had forgotten that previously I had installed the json-tools package from the marketplace.
You go to the link and drag the button onto your workspace and the install kicks off automatically. Simple! Now I have the right click "Format JSON" option back again.

Monday, January 11, 2016

Aggregrating time series events using DB2 SQL

Someone asked me for some SQL to aggregate up some data from a home energy monitor. In this particular case it's a plug controlling a microwave oven and measuring the power that it uses. The input data looks something like the following:

Dev    Meas      Timestamp              Power
Plug    power    1449660961833    109.6
Plug    power    1449660963225    209.2
Plug    power    1449660964291    509.9
Plug    power    1449660965296    1199.2
Plug    power    1449660966530    1197
Plug    power    1449660967766    1195.6
Plug    power    1449660969023    1196.1
Plug    power    1449660970216    1199
Plug    power    1449660972727    1198.1
Plug    power    1449660973929    1197.1
Plug    power    1449660975130    1193.9
Plug    power    1449660976376    667.3
Plug    power    1449660977589    174.8
Plug    power    1449660978800    76.3
Plug    power    1449660980033    51.8
Plug    power    1449660986193    11.7
Plug    power    1449660987398    3.7
Plug    power    1449660988799    1.9
Plug    power    1449661288372    211.8
Plug    power    1449661289734    478
Plug    power    1449661290753    1216.7
Plug    power    1449661292024    1204.9
Plug    power    1449661293171    1202
Plug    power    1449661298122    1201.5
Plug    power    1449661302994    670.6
Plug    power    1449661304224    175.7
Plug    power    1449661305498    76.8


The objective is to translate this into

Start ts                    End ts                  On/Off    Total Power
1449660961833    1449660986193    1    11386.6
1449660987398    1449660988799    0    5.6
1449661288372    1449661312834    1    6534.3
1449661314041    1449666826229    0    24.0
1449667136265    1449667196427    1    39542.4
1449667197658    1449667197658    0    1.7
1449667598249    1449667647221    1    24923.4
1449667648462    1449670435807    0    12.3
1449670609718    1449670646433    1    19924.9
1449670647639    1449670650052    0    7.5
1449671163432    1449671200244    1    18427.8
1449671201449    1449674183396    0    16.0
1449674184637    1449674184637    1    12.6
1449674190220    1449674212614    0    1.4
1449674245143    1449674246497    1    21.4
1449674251344    1449674251344    0    0

This kind of thing is easy to do in SQL using OLAP functions assuming you have a unique grouping key for each data partition you want to group. The trick here is to generate that key in order to uniquely identify each on and off period for the microwave. This is what I ended up with. It seems long winded so I want to see if I can simplify it.

-- mark each row as plug on (1) or plug off (0)
with state_data as (
  select ts,
         event,
         case
           when cast(event as integer) > 5 then 1  -- set the on/off threshold value here
           else 0
         end as plug_state
  from events
  where device = 'Plug' and
        name = 'power'
  order by ts asc
),
-- add some information from the previous row to each row
lag_data as (
  select lag(plug_state, 1,0) over ( order by ts) as last_period_plug_state,
         lag(ts, 1,0) over ( order by ts) as last_period_end_ts,
         ts as period_start_ts,
         event as period_event,
         plug_state as period_plug_state
  from state_data
  order by ts asc
),
-- select just the rows where the plug changes from on to off or from off to on
transition_data as (
  select last_period_end_ts, period_start_ts, period_plug_state from lag_data
  where period_plug_state = 0 and last_period_plug_state = 1 or
        period_plug_state = 1 and last_period_plug_state = 0
),
-- create a set of rows that describe the start and end of each on or off period
period_data as (
  select lag(period_start_ts, 1,0) over ( order by period_start_ts) as period_start_ts,
        last_period_end_ts as period_end_ts
  from transition_data
),
-- annotate each row in the full table with the time of the period to which it belongs
annotated_state_data as (
  select *, p.period_start_ts as period_id
  from state_data s, period_data p
  where s.ts >= p.period_start_ts and
        s.ts <= p.period_end_ts
)
-- finally aggregate up each period
select avg(period_start_ts), avg(period_end_ts), avg(plug_state), sum(event)
  from annotated_state_data
  group by period_id