Wednesday, June 1, 2016

BigInsights 4 - Running later versions of Spark

With BigInsights you get what you get in terms of the versions of components that are shipped in the box. I'm on BigInisighs 4.1 currently which ships with Spark 1.4.1 and I needed to quickly test something on the cluster with Spark 1.6.1. What I did is not a supported configuration but it got me going quickly and relatively simply. YMMV

Firstly I downloaded the Spark binaries from Apache that have the right versions of Hadoop etc. I used the following link

http://spark.apache.org/downloads.html

I selected

"1.6.1" as the Spark release
"Pre-built for Hadoop 2.6 or later" as the package type

This gave me spark-1.6.1-bin-hadoop2.6.tgz. I checked the signatures and unpacked it in my home directory on the cluster. My home directory happens to be mounted on all of the nodes in the cluster.

I then wrote a script call spark-submit.sh to use in place of the spark-submit command that ships with Spark 1.4.1 from BigInsights 4.1. It has the following contents:

export SPARK_HOME=/home/me/spark/spark-1.6.1-bin-hadoop2.6
export PATH=$SPARK_HOME/bin:$PATH

# Options read in YARN client mode
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
export HADOOP_CONF_DIR=/etc/hadoop/conf
# - SPARK_EXECUTOR_INSTANCES, Number of executors to start (Default: 2)
export SPARK_EXECUTOR_INSTANCES=1

$SPARK_HOME/bin/spark-submit --master yarn-client "$@"


Note that here we are just setting the environment to point to the 1.6.1 install and, for testing purposes, just using one executor. It then calls through to the 1.6.1 spark-submit program which will in turn use the BigInsights Yarn install to fire up containers for the Spark job you are submitting.

Note that the spark distribution directory contains a conf directory. In my case

/home/me/spark/spark-1.6.1-bin-hadoop2.6/conf

Two files need to be created there. When I first tried to run some jobs I was getting the error



Error: Could not find or load main class org.apache.spark.deploy.yarn.ApplicationMaster
 

To solve this I did the following

cd /home/me/spark/spark-1.6.1-bin-hadoop2.6/conf


cp spark-defaults.conf.template spark-defaults.conf

vi spark-defaults.conf

add

spark.executor.extraJavaOptions  -Diop.version=4.1.0.0

spark.driver.extraJavaOptions -Diop.version=4.1.0.0

spark.yarn.am.extraJavaOptions -Diop.version=4.1.0.0

Then

touch java-opts

add

-Diop.version=4.1.0.0


After this my example program ran successfully.

I can now choose which version of Spark I want to run with and I have changed the Ambari based configuration of BigInsights. This is not ideal if you want a consistent cluster but OK for me running a few quick tests.

Monday, February 22, 2016

Formatting JSON Files in Eclipse

This is just a short post so that next time I don't forget the following.

I've just moved to the Eclipse Mars release and one of the first things I did with the new install was to open a JSON file and find that I didn't have the option to format the JSON from being stretched out on a single line to being prett-printed so I could read it.

I had forgotten that previously I had installed the json-tools package from the marketplace.
You go to the link and drag the button onto your workspace and the install kicks off automatically. Simple! Now I have the right click "Format JSON" option back again.

Monday, January 11, 2016

Aggregrating time series events using DB2 SQL

Someone asked me for some SQL to aggregate up some data from a home energy monitor. In this particular case it's a plug controlling a microwave oven and measuring the power that it uses. The input data looks something like the following:

Dev    Meas      Timestamp              Power
Plug    power    1449660961833    109.6
Plug    power    1449660963225    209.2
Plug    power    1449660964291    509.9
Plug    power    1449660965296    1199.2
Plug    power    1449660966530    1197
Plug    power    1449660967766    1195.6
Plug    power    1449660969023    1196.1
Plug    power    1449660970216    1199
Plug    power    1449660972727    1198.1
Plug    power    1449660973929    1197.1
Plug    power    1449660975130    1193.9
Plug    power    1449660976376    667.3
Plug    power    1449660977589    174.8
Plug    power    1449660978800    76.3
Plug    power    1449660980033    51.8
Plug    power    1449660986193    11.7
Plug    power    1449660987398    3.7
Plug    power    1449660988799    1.9
Plug    power    1449661288372    211.8
Plug    power    1449661289734    478
Plug    power    1449661290753    1216.7
Plug    power    1449661292024    1204.9
Plug    power    1449661293171    1202
Plug    power    1449661298122    1201.5
Plug    power    1449661302994    670.6
Plug    power    1449661304224    175.7
Plug    power    1449661305498    76.8


The objective is to translate this into

Start ts                    End ts                  On/Off    Total Power
1449660961833    1449660986193    1    11386.6
1449660987398    1449660988799    0    5.6
1449661288372    1449661312834    1    6534.3
1449661314041    1449666826229    0    24.0
1449667136265    1449667196427    1    39542.4
1449667197658    1449667197658    0    1.7
1449667598249    1449667647221    1    24923.4
1449667648462    1449670435807    0    12.3
1449670609718    1449670646433    1    19924.9
1449670647639    1449670650052    0    7.5
1449671163432    1449671200244    1    18427.8
1449671201449    1449674183396    0    16.0
1449674184637    1449674184637    1    12.6
1449674190220    1449674212614    0    1.4
1449674245143    1449674246497    1    21.4
1449674251344    1449674251344    0    0

This kind of thing is easy to do in SQL using OLAP functions assuming you have a unique grouping key for each data partition you want to group. The trick here is to generate that key in order to uniquely identify each on and off period for the microwave. This is what I ended up with. It seems long winded so I want to see if I can simplify it.

-- mark each row as plug on (1) or plug off (0)
with state_data as (
  select ts,
         event,
         case
           when cast(event as integer) > 5 then 1  -- set the on/off threshold value here
           else 0
         end as plug_state
  from events
  where device = 'Plug' and
        name = 'power'
  order by ts asc
),
-- add some information from the previous row to each row
lag_data as (
  select lag(plug_state, 1,0) over ( order by ts) as last_period_plug_state,
         lag(ts, 1,0) over ( order by ts) as last_period_end_ts,
         ts as period_start_ts,
         event as period_event,
         plug_state as period_plug_state
  from state_data
  order by ts asc
),
-- select just the rows where the plug changes from on to off or from off to on
transition_data as (
  select last_period_end_ts, period_start_ts, period_plug_state from lag_data
  where period_plug_state = 0 and last_period_plug_state = 1 or
        period_plug_state = 1 and last_period_plug_state = 0
),
-- create a set of rows that describe the start and end of each on or off period
period_data as (
  select lag(period_start_ts, 1,0) over ( order by period_start_ts) as period_start_ts,
        last_period_end_ts as period_end_ts
  from transition_data
),
-- annotate each row in the full table with the time of the period to which it belongs
annotated_state_data as (
  select *, p.period_start_ts as period_id
  from state_data s, period_data p
  where s.ts >= p.period_start_ts and
        s.ts <= p.period_end_ts
)
-- finally aggregate up each period
select avg(period_start_ts), avg(period_end_ts), avg(plug_state), sum(event)
  from annotated_state_data
  group by period_id

Wednesday, August 26, 2015

BigInsights 4 - Setting up to use SparkR

There is a great post here http://www.r-bloggers.com/installing-and-starting-sparkr-locally-on-windows-os-and-rstudio/ about getting set up with Spark, SparkR and RStrudio. I'm not going to repeat the detail but just record a few details about my setup.

With BigInsights 4.1 just having been release (http://www-01.ibm.com/software/data/infosphere/hadoop/trials.html) we now get the IBM Open Platform including Spark 1.4.1 and hence including SparkR. I have BigInsights installed on a cluster but to get started on my Windows 7 machine I downloaded a binary version of Spark 1.4.1 separately.

I tried following the instructions in the linked blog but I got a warning that SparkR had been compile with R 3.1.3 while I was at an older version (3.0.3). So first of all I upgraded my R version to the latest (3.2.2)

My install of RStudio magically detected the new version of R installed and ran with that (presumably it picks the version number up from the registry)

The SparkR library loads now without any warnings. However the sparkR.init() step doesn't complete. There are some posts about this issue out there. I found that the problem was that the scripts as provided in the Spark dowload from Apache were not set as executable. Even on windows. Doing an "ls" in Cygwin gave.

4 -rwxr-xr-x  1 slaws None 3121 Jul  8 23:59 spark-shell
4 -rw-r--r--  1 slaws None 1008 Jul  8 23:59 spark-shell.cmd
4 -rw-r--r--  1 slaws None 1868 Jul  8 23:59 spark-shell2.cmd
4 -rwxr-xr-x  1 slaws None 1794 Jul  8 23:59 spark-sql
4 -rwxr-xr-x  1 slaws None 1291 Jul  8 23:59 spark-submit
4 -rw-r--r--  1 slaws None 1083 Aug 26 10:57 spark-submit.cmd
4 -rw-r--r--  1 slaws None 1374 Jul  8 23:59 spark-submit2.cmd

A quick "chmod 755 *" got me a step closer. Now when running the sparkR.init() I get not error response but it still fails to step. Further investigation showed that depending on how the system2() call is configured, that sparkR.init() uses under the covers, the the call out to spark-commit worked or didn't work. This seemed very strange so somewhat at random I switched to R version 3.1.3 which, from above, you will see is the version that the SparkR library is built against. Low and behold it worked. Yay!

Here is the script I'm playing with which is 99% copied from the blog linked at the top of this post.

# primarily copied from
# http://www.r-bloggers.com/installing-and-starting-sparkr-locally-on-windows-os-and-rstudio/

# add SparkR lib dir to library paths
Sys.setenv(SPARK_HOME = "C:/simonbu/big-data/Spark/runtime/spark-1.4.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R","lib"), .libPaths()))
.libPaths()
#.Platform$OS.type
#getAnywhere(launchBackend)

# load SparkR
library(SparkR)

# initialise the SparkR context using Spark locally
sc <- sparkR.init(master = "local")
sqlContext <- sparkRSQL.init(sc)

# create a spark data frame with a sample data set
sparkDF <- createDataFrame(sqlContext, faithful)
head(sparkDF)

# Create a simple local data.frame
localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))

# Convert local data frame to a SparkR DataFrame
convertedDF <- createDataFrame(sqlContext, localDF)

printSchema(sparkDF)
# printSchema(localDF)  - doesn't work
printSchema(convertedDF)

summary(convertedDF)

# Register this DataFrame as a table.
registerTempTable(convertedDF, "people")

# SQL statements can be run by using the sql methods provided by sqlContext
teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")

# Call collect to get a local data.frame
teenagersLocalDF <- collect(teenagers)

# Print the teenagers in our dataset
print(teenagersLocalDF)

sparkR.stop()


Tuesday, August 25, 2015

Running Titan with Solr - Part 1

I wanted to understand the integration between Titan and Solr. Titan can use Solr (or Elastic Search) to index vertex and edge properties but it's not clear from reading the internet how this works and whether you have any sensible access to the created index outside of the Titan API.

Installing Titan 

I started with the recent Titan 0.9.0 M2 release from here http://s3.thinkaurelius.com/downloads/titan/titan-0.9.0-M2-hadoop1.zip

I unzipped that and tried to fire up gremlin with

bin/gremlin.bat

It failed complaining that the classpath was too long so I made a few changes to the script. I commented out the part that loops round collecting all the individual jar paths.

::set CP=
::for %%i in (%LIBDIR%\*.jar) do call :concatsep %%i

 Further down I directly set the classpath to be just the jars from the lib directory using a wildcard. I.e. I was going for the shortest classpath possible.

::set CLASSPATH=%CP%;%OLD_CLASSPATH%
set CLASSPATH=./lib/*

This seemed to do the trick but then I fell over Java version problems. I upgraded to Java 8. The Titan jars wouldn't run with the IBM JDK for some reason so I ended up with the Oracle JDK and set my environment accordingly.

set path=C:\simon\apps\jdk-8-51-sun\bin;c:\Windows\system32
set JAVA_HOME=C:\simon\apps\jdk-8-51-sun

 Installing Solr

With the gremlin shell now running I set about getting solr going. I downloaded 5.2.1 from here http://archive.apache.org/dist/lucene/solr/

After unzipping I had a poke about. As a first time user it's hard to work out how to get it going. I wasn't sure whether I needed a stand alone setup or a cloud setup for me simple testing. The thing that was relly confusing me was the question of what schema was required.

I tried the cloud example

bin\solr start -e cloud

and answered the questions. This bought up Solr at http://localhost:8983/solr. But then I wanted to see if I could add some data so I tried the post example.

bin/post -c gettingstarted docs/

But that didn't work as it complained that it didn't understand the fields that were being pushed in. I tried creating a "core" in the admin UI but couldn't work out how to make that hang together. Eventually I found.

bin\solr start -e schemaless

And life was good! I was able to run the post example and see the data in the index.

Having got Solr going I set about creating a core to store the Titan index. I went with.

name: titan
instance: /tmp/solr-titan/titan
data: /tmp/solr-titan/data

I Copied the contents of titan/conf/solr to /tmp/solr-titan/titan-core/conf

I had to comment out some stuff to do with geo in the schema.xml due to a class not found problem
then I successfully created the titan core.

I stated out with a different name for the core but had to come back and rename it to "titan". See below. 

Connecting Titan to Solr

Having got Titan and Solr working I now needed to start Titan with a suitable connection to Solr. From gremlin.sh I did.

graph = TitanFactory.open('conf/titan-berkeleyje.properties')

This command creates a Titan database runtime within the grmplin process based on the configuration in the properties file. If you look inside the file is just tells Titan where to find Solr. Here is a subset of the file.

storage.directory=../db/berkeley
index.search.backend=solr
index.search.solr.mode=http
index.search.solr.http-urls=http://localhost:8983/solr















I just used the settings as supplied. There are a couple od gotchas here though.

Firstly the Solr core name isn't defined and Titan assumes it will be called "titan". I called it something else first and had to go back and rename it.

Secondly, when you run this TitanFactory.open command it creates the database based on the storage.directory property and caches all of these properties in the database. So when you restart titan it's ready to go. The downside of this is that I tried a couple of configurations before settling on this one. The first one I tried involved Elastic Search. I was subsequently confused that when I tried to run against Solr I was getting the following error.

15/08/17 14:45:36 INFO util.ReflectiveConfigOptionLoader: Loaded and initialized
 config classes: 12 OK out of 12 attempts in PT0.196S
15/08/17 14:45:36 WARN configuration.GraphDatabaseConfiguration: Local setting i
ndex.search.solr.mode=http (Type: GLOBAL_OFFLINE) is overridden by globally mana
ged value (cloud).  Use the ManagementSystem interface instead of the local conf
iguration to control this setting.
15/08/17 14:45:36 WARN configuration.GraphDatabaseConfiguration: Local setting i
ndex.search.backend=solr (Type: GLOBAL_OFFLINE) is overridden by globally manage
d value (elasticsearch).  Use the ManagementSystem interface instead of the loca
l configuration to control this setting.
15/08/17 14:45:36 INFO configuration.GraphDatabaseConfiguration: Generated uniqu
e-instance-id=0914d88f7404-R9E67YR1
15/08/17 14:45:36 INFO diskstorage.Backend: Configuring index [search]
15/08/17 14:45:37 INFO elasticsearch.plugins: [Blink] loaded [], sites []
15/08/17 14:45:39 INFO es.ElasticSearchIndex: Configured remote host: 127.0.0.1
: 9300
Could not instantiate implementation: com.thinkaurelius.titan.diskstorage.es.Ela
sticSearchIndex
Display stack trace? [yN]

The answer is to simply delete the data, in my case directory db/berkeley and start again.

This got Titan and Solr up and running and I was ready to create a graph and look at what index was generated. I'll create a separate post about that.






Tuesday, July 14, 2015

BigInsights 4 - Creating An Eclipse and Spark Development Environment

I've just started developing my big data applications with Spark and the thing that I wanted most was a development environment for Eclipse (I'm not an IntelliJ user) that allowed me to build and run Spark/Scala applications locally. Fortunately all of the components have been created to make this happen and being a long time Java/Maven/Eclipse user getting them working together was relatively straightforward. The approach is already described in a number of articles  (Google for Spark and Eclipse) but I wanted to make it work with the Jar versions that ship with BigInsights v4.0.0.1. Here's what I did.

Dependencies

The major dependencies I'm working with are as follows:

Windows 7


IBM JDK 7 (SR3 build 2.7)
Scala 2.10.4
Spark 1.2.1
Eclipse Juno SR 2
Maven 3.0.5

Summary Of The Steps Involved



2 - Install the Scala Eclipse plugin from update site (http://download.scala-ide.org/sdk/helium/e38/scala210/stable/site) update sites listed here (http://scala-ide.org/download/prev-stable.html)
4 - Set up Eclipse so that it can import the maven project (http://scala-ide.org/docs/tutorials/m2eclipse/)
4a - Install the winutils.exe so that Hadoop will work on Windows 7 (for windows users only)
5 - Import the project into Eclipse as a maven project 

I describe steps 3 and 5 in more detail below. 

Creating A Maven Project For Spark Development (Step 3 Detail)

In some directory create a new project directory with the following structure:


SomeDirectory/
    MySparkProject/
       src/
           main/
                scala/

       pom.xml  


There are probably Maven archetypes for creating these directories but I didn't check as for my purposes I only needed this very simple structure. The pom.xml file is obviously the interesting thing here. I started with the following contents:




<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>

   <groupId>com.ibm.ets</groupId>
   <artifactId>MyProject</artifactId>
   <version>0.1-SNAPSHOT</version>

   <name>MyProject</name>


    <repositories>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>
    </repositories>
    <pluginRepositories>
      <pluginRepository>
        <id>scala-tools.org</id>
        <name>Scala-Tools Maven2 Repository</name>
        <url>http://scala-tools.org/repo-releases</url>
      </pluginRepository>
    </pluginRepositories>

   <dependencies>
     <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.10.4</version>
      </dependency>

      <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-core_2.10</artifactId>
         <version>1.2.1</version>
      </dependency>
     
      <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-sql_2.10</artifactId>
         <version>1.2.1</version>
      </dependency>
     
      <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-hive_2.10</artifactId>
         <version>1.2.1</version>
      </dependency>     
     
      <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-streaming_2.10</artifactId>
         <version>1.2.1</version>
      </dependency>
     
      <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-mllib_2.10</artifactId>
         <version>1.2.1</version>
      </dependency>
     
      <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-graphx_2.10</artifactId>
         <version>1.2.1</version>
      </dependency>
     
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0</version>
      </dependency>

   </dependencies>

<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                        <source>1.7</source>
                        <target>1.7</target>
                      </configuration>
            </plugin>
        </plugins>
    </pluginManagement>
    <plugins>
        <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

</project>



I've selected Scala, Spark and Hadoop versions that match with those shipped in BigInsights 4.


You can now build Scala/Spark (or Java/Spark) programs here, using you favorite text editor, e.g. notepad or vi, and compile them using Maven to produce a jar for use with spark-submit. However I wanted to do my development in Eclipse. 

Installing winutils So This All Works on Windows (Step 4a Detail)

This is of course for windows users only.  

I initially got the following error when running a Spark program from Eclipse
 
Could not locate executable null\bin\winutils.exe
 
There are lots of posts about this and the solution is easy. Here is an example http://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path. 

I copied the winutils.exe to a directory and then set HADOOP_HOME variable as follows

HADOOP_HOME=C:\simon\svn\bigdata2\Spark\winutil 

Then remember to restart Eclipse! 
 

Importing The Project Into Eclipse (Step 5 Detail)

You will have installed the "maven 2 eclipse" plugin at step 4 (see the summary of steps above) so you can now import your Spark maven project using the following option:

 


To run Spark programs from Eclipse locally you need to configure Spark to run locally (rather than on a cluster). As a convenience I pass a parameter into my code to indicate whether I'm running in Eclipse. I modified the sample Spark code as follows to process the parameter. 

import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext._;
import org.apache.spark.SparkConf;
import org.apache.spark.rdd.RDD;


object SparkTutorial {
  def main(args: Array[String]) {
    var conf:SparkConf = null;
    var taxi:RDD[String] = null;
   
    if (args.length > 0 && args(0) == "local") {
      conf = new SparkConf().setAppName("Spark Tutorial").setMaster("local[*]");
    } else {
      conf = new SparkConf().setAppName("Spark Tutorial").setMaster("yarn-cluster");
    }

    val sc = new SparkContext(conf);
   
    if (args.length > 0 && args(0) == "local") {
       taxi = sc.textFile("nyctaxisub.csv", 2);
    } else {
       taxi = sc.textFile("/user/spark/simonstuff/sparkdata/nyctaxisub/*");
    }
  
    val taxiSplitColumns = taxi.map(line=>line.split(','));
    val taxiMedCountsTuples = taxiSplitColumns.map(vals=>(vals(6),1));
    val taxiMedCountsOneLine = taxiMedCountsTuples.reduceByKey((x,y) => x + y);
   
   
    for (pair<-taxiMedCountsOneLine.map(_.swap).top(10)){
      println("Taxi Medallion %s had %s Trips".format(pair._2,pair._1));
    }
  }
}

Note that I check twice whether a single parameter value of "local" is passed into the program. If so the command  setMaster("local[*]") instructs Spark to run locally,  i.e. doesn't require a separate cluster installation. It also reads the input data from a local directory path rather than from an HDFS directory path. 

I pass the parameter into the program via an Eclipse run configuration in the following way:


Running On a Cluster


Having developed your Spark application in Eclipse you probably want to run it on a cluster. I export the project as a Java jar file from Eclipse. You can also generate the Jar file by going back to your Maven project and building it there, for example, by running mvn package.  

Once you have the Jar you can copy it to the head node of your cluster and run it using Spark, for example,
Spark-submit –class “SparkTutorial” –master yarn-cluster /SomeDirectory/MyProject/target/MyProject.jar