Wednesday, August 26, 2015

BigInsights 4 - Setting up to use SparkR

There is a great post here http://www.r-bloggers.com/installing-and-starting-sparkr-locally-on-windows-os-and-rstudio/ about getting set up with Spark, SparkR and RStrudio. I'm not going to repeat the detail but just record a few details about my setup.

With BigInsights 4.1 just having been release (http://www-01.ibm.com/software/data/infosphere/hadoop/trials.html) we now get the IBM Open Platform including Spark 1.4.1 and hence including SparkR. I have BigInsights installed on a cluster but to get started on my Windows 7 machine I downloaded a binary version of Spark 1.4.1 separately.

I tried following the instructions in the linked blog but I got a warning that SparkR had been compile with R 3.1.3 while I was at an older version (3.0.3). So first of all I upgraded my R version to the latest (3.2.2)

My install of RStudio magically detected the new version of R installed and ran with that (presumably it picks the version number up from the registry)

The SparkR library loads now without any warnings. However the sparkR.init() step doesn't complete. There are some posts about this issue out there. I found that the problem was that the scripts as provided in the Spark dowload from Apache were not set as executable. Even on windows. Doing an "ls" in Cygwin gave.

4 -rwxr-xr-x  1 slaws None 3121 Jul  8 23:59 spark-shell
4 -rw-r--r--  1 slaws None 1008 Jul  8 23:59 spark-shell.cmd
4 -rw-r--r--  1 slaws None 1868 Jul  8 23:59 spark-shell2.cmd
4 -rwxr-xr-x  1 slaws None 1794 Jul  8 23:59 spark-sql
4 -rwxr-xr-x  1 slaws None 1291 Jul  8 23:59 spark-submit
4 -rw-r--r--  1 slaws None 1083 Aug 26 10:57 spark-submit.cmd
4 -rw-r--r--  1 slaws None 1374 Jul  8 23:59 spark-submit2.cmd

A quick "chmod 755 *" got me a step closer. Now when running the sparkR.init() I get not error response but it still fails to step. Further investigation showed that depending on how the system2() call is configured, that sparkR.init() uses under the covers, the the call out to spark-commit worked or didn't work. This seemed very strange so somewhat at random I switched to R version 3.1.3 which, from above, you will see is the version that the SparkR library is built against. Low and behold it worked. Yay!

Here is the script I'm playing with which is 99% copied from the blog linked at the top of this post.

# primarily copied from
# http://www.r-bloggers.com/installing-and-starting-sparkr-locally-on-windows-os-and-rstudio/

# add SparkR lib dir to library paths
Sys.setenv(SPARK_HOME = "C:/simonbu/big-data/Spark/runtime/spark-1.4.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R","lib"), .libPaths()))
.libPaths()
#.Platform$OS.type
#getAnywhere(launchBackend)

# load SparkR
library(SparkR)

# initialise the SparkR context using Spark locally
sc <- sparkR.init(master = "local")
sqlContext <- sparkRSQL.init(sc)

# create a spark data frame with a sample data set
sparkDF <- createDataFrame(sqlContext, faithful)
head(sparkDF)

# Create a simple local data.frame
localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))

# Convert local data frame to a SparkR DataFrame
convertedDF <- createDataFrame(sqlContext, localDF)

printSchema(sparkDF)
# printSchema(localDF)  - doesn't work
printSchema(convertedDF)

summary(convertedDF)

# Register this DataFrame as a table.
registerTempTable(convertedDF, "people")

# SQL statements can be run by using the sql methods provided by sqlContext
teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")

# Call collect to get a local data.frame
teenagersLocalDF <- collect(teenagers)

# Print the teenagers in our dataset
print(teenagersLocalDF)

sparkR.stop()


Tuesday, August 25, 2015

Running Titan with Solr - Part 1

I wanted to understand the integration between Titan and Solr. Titan can use Solr (or Elastic Search) to index vertex and edge properties but it's not clear from reading the internet how this works and whether you have any sensible access to the created index outside of the Titan API.

Installing Titan 

I started with the recent Titan 0.9.0 M2 release from here http://s3.thinkaurelius.com/downloads/titan/titan-0.9.0-M2-hadoop1.zip

I unzipped that and tried to fire up gremlin with

bin/gremlin.bat

It failed complaining that the classpath was too long so I made a few changes to the script. I commented out the part that loops round collecting all the individual jar paths.

::set CP=
::for %%i in (%LIBDIR%\*.jar) do call :concatsep %%i

 Further down I directly set the classpath to be just the jars from the lib directory using a wildcard. I.e. I was going for the shortest classpath possible.

::set CLASSPATH=%CP%;%OLD_CLASSPATH%
set CLASSPATH=./lib/*

This seemed to do the trick but then I fell over Java version problems. I upgraded to Java 8. The Titan jars wouldn't run with the IBM JDK for some reason so I ended up with the Oracle JDK and set my environment accordingly.

set path=C:\simon\apps\jdk-8-51-sun\bin;c:\Windows\system32
set JAVA_HOME=C:\simon\apps\jdk-8-51-sun

 Installing Solr

With the gremlin shell now running I set about getting solr going. I downloaded 5.2.1 from here http://archive.apache.org/dist/lucene/solr/

After unzipping I had a poke about. As a first time user it's hard to work out how to get it going. I wasn't sure whether I needed a stand alone setup or a cloud setup for me simple testing. The thing that was relly confusing me was the question of what schema was required.

I tried the cloud example

bin\solr start -e cloud

and answered the questions. This bought up Solr at http://localhost:8983/solr. But then I wanted to see if I could add some data so I tried the post example.

bin/post -c gettingstarted docs/

But that didn't work as it complained that it didn't understand the fields that were being pushed in. I tried creating a "core" in the admin UI but couldn't work out how to make that hang together. Eventually I found.

bin\solr start -e schemaless

And life was good! I was able to run the post example and see the data in the index.

Having got Solr going I set about creating a core to store the Titan index. I went with.

name: titan
instance: /tmp/solr-titan/titan
data: /tmp/solr-titan/data

I Copied the contents of titan/conf/solr to /tmp/solr-titan/titan-core/conf

I had to comment out some stuff to do with geo in the schema.xml due to a class not found problem
then I successfully created the titan core.

I stated out with a different name for the core but had to come back and rename it to "titan". See below. 

Connecting Titan to Solr

Having got Titan and Solr working I now needed to start Titan with a suitable connection to Solr. From gremlin.sh I did.

graph = TitanFactory.open('conf/titan-berkeleyje.properties')

This command creates a Titan database runtime within the grmplin process based on the configuration in the properties file. If you look inside the file is just tells Titan where to find Solr. Here is a subset of the file.

storage.directory=../db/berkeley
index.search.backend=solr
index.search.solr.mode=http
index.search.solr.http-urls=http://localhost:8983/solr















I just used the settings as supplied. There are a couple od gotchas here though.

Firstly the Solr core name isn't defined and Titan assumes it will be called "titan". I called it something else first and had to go back and rename it.

Secondly, when you run this TitanFactory.open command it creates the database based on the storage.directory property and caches all of these properties in the database. So when you restart titan it's ready to go. The downside of this is that I tried a couple of configurations before settling on this one. The first one I tried involved Elastic Search. I was subsequently confused that when I tried to run against Solr I was getting the following error.

15/08/17 14:45:36 INFO util.ReflectiveConfigOptionLoader: Loaded and initialized
 config classes: 12 OK out of 12 attempts in PT0.196S
15/08/17 14:45:36 WARN configuration.GraphDatabaseConfiguration: Local setting i
ndex.search.solr.mode=http (Type: GLOBAL_OFFLINE) is overridden by globally mana
ged value (cloud).  Use the ManagementSystem interface instead of the local conf
iguration to control this setting.
15/08/17 14:45:36 WARN configuration.GraphDatabaseConfiguration: Local setting i
ndex.search.backend=solr (Type: GLOBAL_OFFLINE) is overridden by globally manage
d value (elasticsearch).  Use the ManagementSystem interface instead of the loca
l configuration to control this setting.
15/08/17 14:45:36 INFO configuration.GraphDatabaseConfiguration: Generated uniqu
e-instance-id=0914d88f7404-R9E67YR1
15/08/17 14:45:36 INFO diskstorage.Backend: Configuring index [search]
15/08/17 14:45:37 INFO elasticsearch.plugins: [Blink] loaded [], sites []
15/08/17 14:45:39 INFO es.ElasticSearchIndex: Configured remote host: 127.0.0.1
: 9300
Could not instantiate implementation: com.thinkaurelius.titan.diskstorage.es.Ela
sticSearchIndex
Display stack trace? [yN]

The answer is to simply delete the data, in my case directory db/berkeley and start again.

This got Titan and Solr up and running and I was ready to create a graph and look at what index was generated. I'll create a separate post about that.






Tuesday, July 14, 2015

BigInsights 4 - Creating An Eclipse and Spark Development Environment

I've just started developing my big data applications with Spark and the thing that I wanted most was a development environment for Eclipse (I'm not an IntelliJ user) that allowed me to build and run Spark/Scala applications locally. Fortunately all of the components have been created to make this happen and being a long time Java/Maven/Eclipse user getting them working together was relatively straightforward. The approach is already described in a number of articles  (Google for Spark and Eclipse) but I wanted to make it work with the Jar versions that ship with BigInsights v4.0.0.1. Here's what I did.

Dependencies

The major dependencies I'm working with are as follows:

Windows 7


IBM JDK 7 (SR3 build 2.7)
Scala 2.10.4
Spark 1.2.1
Eclipse Juno SR 2
Maven 3.0.5

Summary Of The Steps Involved



2 - Install the Scala Eclipse plugin from update site (http://download.scala-ide.org/sdk/helium/e38/scala210/stable/site) update sites listed here (http://scala-ide.org/download/prev-stable.html)
4 - Set up Eclipse so that it can import the maven project (http://scala-ide.org/docs/tutorials/m2eclipse/)
4a - Install the winutils.exe so that Hadoop will work on Windows 7 (for windows users only)
5 - Import the project into Eclipse as a maven project 

I describe steps 3 and 5 in more detail below. 

Creating A Maven Project For Spark Development (Step 3 Detail)

In some directory create a new project directory with the following structure:


SomeDirectory/
    MySparkProject/
       src/
           main/
                scala/

       pom.xml  


There are probably Maven archetypes for creating these directories but I didn't check as for my purposes I only needed this very simple structure. The pom.xml file is obviously the interesting thing here. I started with the following contents:




<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>

   <groupId>com.ibm.ets</groupId>
   <artifactId>MyProject</artifactId>
   <version>0.1-SNAPSHOT</version>

   <name>MyProject</name>


    <repositories>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>
    </repositories>
    <pluginRepositories>
      <pluginRepository>
        <id>scala-tools.org</id>
        <name>Scala-Tools Maven2 Repository</name>
        <url>http://scala-tools.org/repo-releases</url>
      </pluginRepository>
    </pluginRepositories>

   <dependencies>
     <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.10.4</version>
      </dependency>

      <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-core_2.10</artifactId>
         <version>1.2.1</version>
      </dependency>
     
      <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-sql_2.10</artifactId>
         <version>1.2.1</version>
      </dependency>
     
      <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-hive_2.10</artifactId>
         <version>1.2.1</version>
      </dependency>     
     
      <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-streaming_2.10</artifactId>
         <version>1.2.1</version>
      </dependency>
     
      <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-mllib_2.10</artifactId>
         <version>1.2.1</version>
      </dependency>
     
      <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-graphx_2.10</artifactId>
         <version>1.2.1</version>
      </dependency>
     
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0</version>
      </dependency>

   </dependencies>

<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                        <source>1.7</source>
                        <target>1.7</target>
                      </configuration>
            </plugin>
        </plugins>
    </pluginManagement>
    <plugins>
        <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

</project>



I've selected Scala, Spark and Hadoop versions that match with those shipped in BigInsights 4.


You can now build Scala/Spark (or Java/Spark) programs here, using you favorite text editor, e.g. notepad or vi, and compile them using Maven to produce a jar for use with spark-submit. However I wanted to do my development in Eclipse. 

Installing winutils So This All Works on Windows (Step 4a Detail)

This is of course for windows users only.  

I initially got the following error when running a Spark program from Eclipse
 
Could not locate executable null\bin\winutils.exe
 
There are lots of posts about this and the solution is easy. Here is an example http://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path. 

I copied the winutils.exe to a directory and then set HADOOP_HOME variable as follows

HADOOP_HOME=C:\simon\svn\bigdata2\Spark\winutil 

Then remember to restart Eclipse! 
 

Importing The Project Into Eclipse (Step 5 Detail)

You will have installed the "maven 2 eclipse" plugin at step 4 (see the summary of steps above) so you can now import your Spark maven project using the following option:

 


To run Spark programs from Eclipse locally you need to configure Spark to run locally (rather than on a cluster). As a convenience I pass a parameter into my code to indicate whether I'm running in Eclipse. I modified the sample Spark code as follows to process the parameter. 

import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext._;
import org.apache.spark.SparkConf;
import org.apache.spark.rdd.RDD;


object SparkTutorial {
  def main(args: Array[String]) {
    var conf:SparkConf = null;
    var taxi:RDD[String] = null;
   
    if (args.length > 0 && args(0) == "local") {
      conf = new SparkConf().setAppName("Spark Tutorial").setMaster("local[*]");
    } else {
      conf = new SparkConf().setAppName("Spark Tutorial").setMaster("yarn-cluster");
    }

    val sc = new SparkContext(conf);
   
    if (args.length > 0 && args(0) == "local") {
       taxi = sc.textFile("nyctaxisub.csv", 2);
    } else {
       taxi = sc.textFile("/user/spark/simonstuff/sparkdata/nyctaxisub/*");
    }
  
    val taxiSplitColumns = taxi.map(line=>line.split(','));
    val taxiMedCountsTuples = taxiSplitColumns.map(vals=>(vals(6),1));
    val taxiMedCountsOneLine = taxiMedCountsTuples.reduceByKey((x,y) => x + y);
   
   
    for (pair<-taxiMedCountsOneLine.map(_.swap).top(10)){
      println("Taxi Medallion %s had %s Trips".format(pair._2,pair._1));
    }
  }
}

Note that I check twice whether a single parameter value of "local" is passed into the program. If so the command  setMaster("local[*]") instructs Spark to run locally,  i.e. doesn't require a separate cluster installation. It also reads the input data from a local directory path rather than from an HDFS directory path. 

I pass the parameter into the program via an Eclipse run configuration in the following way:


Running On a Cluster


Having developed your Spark application in Eclipse you probably want to run it on a cluster. I export the project as a Java jar file from Eclipse. You can also generate the Jar file by going back to your Maven project and building it there, for example, by running mvn package.  

Once you have the Jar you can copy it to the head node of your cluster and run it using Spark, for example,
Spark-submit –class “SparkTutorial” –master yarn-cluster /SomeDirectory/MyProject/target/MyProject.jar