Monday, August 12, 2013

Installing Hive using Cloudera Manager CDH4.6+ on an already set HDFS Cluster

Hive is part of the Hadoop project currently licensed under Apache License.
Hive is a data warehouse system for Hadoop that facilitates easy data summarization, ad-hoc queries, and the analysis of large datasets stored in Hadoop compatible file systems. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. At the same time this language also allows traditional map/reduce programmers to plug in their custom mappers and reducers when it is inconvenient or inefficient to express this logic in HiveQL.

Hive home: http://hive.apache.org/

 

Apache Hive Getting Started Guide

Check out the Getting Started Guide on the Apache Hive wiki.

How to install Hive using Cloudera Manager (CM) CDH 4.+

 

 Prerequisite: The Hadoop Cluster

The following supposes that CDH 4.x was used to setup a Hadoop cluster, and that the Hadoop services (HDFS, MapReduce, Zookeeper) are up and running.

 

Adding the Hive Service

From the Cloudera Manager (CM) menu, top right, click on Actions:


In the drop-down menu, select "Add Service" then "Hive". Accept the dependency on Zookeeper.

Depending on the size of the cluster, it might be good to have an external database for all the Relational data required by Hive to support SQL like syntax.

 

Configuration For Hive to use an external Database and not an Embedded one

Very well documented on the Cloudera web site: Remote DB Installation Guide

 

Hive shell RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient

The thing is that on both my clusters, the same issue came up when launching the "hive" shell:

  > hive
  hive > SHOW TABLES;


Generated the following error (in both cases):

root@master1:/var/log/hadoop-hdfs# hive
Logging initialized using configuration in jar:file:/usr/lib/hive/lib/hive-common-0.10.0-cdh4.2.0.jar!/hive-log4j.properties
Hive history file=/tmp/root/hive_job_log_root_201308120940_141559527.txt
hive> show tables;
FAILED: Error in metadata: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask


And the (abbreviated) associated log traces (hive.log):

2013-08-12 09:40:45,119 ERROR metastore.RetryingRawStore (RetryingRawStore.java:invoke(133)) - JDO datastore error. Retrying metastore command after 1000 ms (attempt 1 of 1)
2013-08-12 09:40:46,133 ERROR exec.Task (SessionState.java:printError(401)) - FAILED: Error in metadata: java.lang.RuntimeException: Unable to instantiate 

[..]
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1082)
    ... 23 more
Caused by: javax.jdo.JDODataStoreException: Required table missing : "DBS" in Catalog "" Schema "". DataNucleus requires this table to perform its persistence operations. Either your MetaData is incorrect, or you need to enable "datanucleus.autoCreateTables"
NestedThrowables:
org.datanucleus.store.rdbms.exceptions.MissingTableException: Required table missing : "DBS" in Catalog "" Schema "". DataNucleus requires this table to perform its persistence operations. Either your MetaData is incorrect, or you need to enable "datanucleus.autoCreateTables"
 [..]

org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:61)
    at org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4010)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:120)
    ... 28 more
Caused by: org.datanucleus.store.rdbms.exceptions.MissingTableException: Required table missing : "DBS" in Catalog "" Schema "". DataNucleus requires this table to perform its persistence operations. Either your MetaData is incorrect, or you need to enable "datanucleus.autoCreateTables"
    at org.datanucleus.store.rdbms.table.AbstractTable.exists(AbstractTable.java:455)
[..]

    at org.datanucleus.store.rdbms.RDBMSStoreManager.getExtent(RDBMSStoreManager.java:1332)
    at org.datanucleus.ObjectManagerImpl.getExtent(ObjectManagerImpl.java:4149)
    ... 52 more

2013-08-12 09:40:46,140 ERROR ql.Driver (SessionState.java:printError(401)) - FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask


Note: Cloudera redirects the Hive shell logs to a temporary directory (not in /var/log/hive as could be expected).  /var/log/hive is reserved for the MetaStore process and not the shell logs.  The following location(s) are used instead:
 /tmp/hadoop/hive.log
or
 /tmp/root/hive.log

There can be many reasons for the following exception as can be found on slashdot.org.  In most cases it is due to a problem communicating with the remote database, or an access right issue when the hive shell attempts to modify some of the tables. 


In my case, I am using MySQL as the remote DB server. And when connecting to MySQL, all the Hive MetaData tables are there, and the "hive" user has full access rights on the database. So access rights are not the issue.

It is still important to check in /var/log/hive that the right properties for the remote DB are set and used by the Hive MetaStore. The /var/log/hive/hadoop-....log.out logs will display an INFO log with used properties:

 2013-08-08 13:54:43,251 INFO DataNucleus.Persistence: DataNucleus Persistence Factory initialised for datastore URL="jdbc:mysql://localhost:3306/hive?useUnicode=true&characterEncoding=UTF-8" driver="com.mysql.jdbc.Driver" userName="hive"

But, what is critical is that the hive shell finds and uses the right properties.

In the Cloudera case, the Hive component is found in /usr/lib/hive and the "conf" directory is a link to a specific directory '''/etc/hive/conf''' that contains the Hive shell hive-site.xml

What to do?

Verify there that the config file does contain the remote DB information (javax.jdo.option.ConnectionURL, javax.jdo.option.ConnectionDriverName):

 <?xml version="1.0" encoding="UTF-8"?>
 <!--Autogenerated by Cloudera CM on 2013-08-09T06:59:55.560Z-->
 <configuration>
   <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://127.0.0.1:3306/hive?useUnicode=true&amp;characterEncoding=UTF-8</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>hive</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>******</value>
  </property>


For some reasons, the CDH 4.6 CM does not update this file when generating the Hive configuration after a CM GUI change.  This might be a fault due to the installation but it happened twice on two different clusters...

Fixing the properties file solves the problem. The proper configuration options can be found in the latest CM generated config file (e.g.):
 /run/cloudera-scm-agent/process/5011-hive-HIVEMETASTORE/hive-site.xmlHere 5011 is the latest revision number.

Copying the latest generated file is a potential solution:
 cp /run/cloudera-scm-agent/process/5011-hive-HIVEMETASTORE/hive-site.xml hive-site.xml
 # make it readable by all
 chmod 744 hive-site.xml

Updating the file manually is another.

In both cases though, the problem is that any updates in the Cloudera CM of Hive properties will not be reflected in the Hive shell.

Saturday, May 25, 2013

Of DirectBuffer, int[], GC and erratic performance

This post is inspired by Vladimir Vukicevic. The JVM used is 1.7.0_21 64 bits, I7 930 @2.80Ghz with 16 Gb of RAM and windows 8, and my reason for posting it is that some of my friends were astonished by the results.  The original code can be found on Github there.

Here is the modified code:

/**
 * VM option
 *   -verbose:gc -server -XX:+PrintCompilation
 */

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.List;

/**
 * Test direct buffer vs int[] vs List.
 */
public class NioTest {

    public static void main(String[] asArg) {
        final int MB = 1000 * 1000;
        IntBuffer buf = 
           ByteBuffer.allocateDirect(4 * 10 * MB)
              .order(ByteOrder.nativeOrder()).asIntBuffer();
        int cbMax = buf.capacity();
        int[] ab = new int[cbMax];
        // (1) 
        list = new ArrayList();         
        // end of (1)

        int x = 0;

        // how many times to run the test 
        // (to allow hotspot to optimize)
        int cIters = 200;

        for (int i = 1; i <= cIters; ++i) {
            System.out.println("*** iteration " + i);

            start("array[]: populating");
            for (int of = 0; of < cbMax; ++of) {
                ab[of] = of;
            }
            stop();

            buf.rewind();
            start("direct buffer: absolute populating");
            for (int of = 0; of < cbMax; ++of) {
                buf.put(of, of);
            }
            stop();

            buf.rewind();
            start("direct buffer: relative populating");
            for (int of = 0; of < cbMax; ++of) {
                buf.put(of, of);
            }
            stop();

            /**
              * Populate the array list.
              * Another small addition -> (2)
              */
            start("list: populating");
            list.clear();
            for (int of = 0; of < cbMax; ++of) {
                list.add(of);
            }
            stop();
            // end of (2)

            start("array[]: accessing");
            for (int of = 0; of < cbMax; ++of) {
                x |= ab[of];
            }
            stop();

            buf.rewind();
            start("direct buffer: absolute accessing");
            for (int of = 0; of < cbMax; ++of) {
                x |= buf.get(of);
            }
            stop();

            buf.rewind();
            start("direct buffer: relative accessing");
            for (int of = 0; of < cbMax; ++of) {
                x |= buf.get();
            }
            stop();
            
            /**
              * Direct access in an array list.
              * Another small addition -> (3)
              */
            start("list: direct accessing");
            for (int of = 0; of < cbMax; ++of) {
                x |= list.get(of);
            }
            stop();
            // end of (3)

        }

        // to avoid hotspot completely discarding some 
        // of the loops,
        // by using the variable "x" it can't optimize out 
        // the access inside the loop
        System.out.println("*** Test completed (x=" + x + ")");
    }

    /**
     * Start a test.
     *
     * @param sDesc the test description
     */
    public static void start(String sDesc) {
        s_sDesc = sDesc;
        s_lStart = System.currentTimeMillis();
    }

    /**
     * Finish a test and print the elapsed time.
     */
    public static void stop() {
        long lStop = System.currentTimeMillis();
        System.out.println(s_sDesc + "=" + (lStop - s_lStart) + "ms");
    }

    /**
     * Time at which the current test started.
     */
    static long s_lStart;

    /**
     * Description of the current test.
     */
    static String s_sDesc;
}

Before sharing the result, if the added sections (1), (2) and (3) are removed, the output is:
 
    68    1             java.lang.String::hashCode (55 bytes)
    70    2             java.lang.String::charAt (29 bytes)
*** iteration 1
    103    1 %           NioTest::main @ 80 (323 bytes)
array[]: populating=30.573388ms
    135    3             java.nio.Buffer::checkIndex (22 bytes)
    135    4             java.nio.DirectIntBufferU::put (18 bytes)
    135    6     n       sun.misc.Unsafe::putInt (0 bytes)   
    136    5             java.nio.DirectIntBufferU::ix (10 bytes)
direct buffer: absolute populating=59.400435ms
direct buffer: relative populating=57.650079ms
array[]: accessing=6.619388ms
    259    7             java.nio.DirectIntBufferU::get (16 bytes)
    259    8     n       sun.misc.Unsafe::getInt (0 bytes)   
  direct buffer: absolute accessing=45.15859ms
    305    9             java.nio.DirectIntBufferU::get (15 bytes)
    305   10             java.nio.Buffer::nextGetIndex (31 bytes)
direct buffer: relative accessing=46.361253ms
*** iteration 2
array[]: populating=8.189489ms
direct buffer: absolute populating=57.800777ms
direct buffer: relative populating=57.850401ms
array[]: accessing=6.623036ms
direct buffer: absolute accessing=42.51609ms
direct buffer: relative accessing=44.662711ms
[... repeat 197 times ...]
*** iteration 200
array[]: populating=8.223424ms
direct buffer: absolute populating=57.604104ms
direct buffer: relative populating=57.605928ms
array[]: accessing=6.539113ms
direct buffer: absolute accessing=41.6057ms
direct buffer: relative accessing=44.483917ms

It takes 5 iterations to finish all compilations...
 
*** iteration 5
array[]: populating=8.766374ms
direct buffer: absolute populating=57.51799ms
   1069   11             java.lang.String::indexOf (70 bytes))

With (1), (2) and (3) disabled, there are no GC pause triggered which in itself is quite normal.  A simple int[] is an order faster than the direct buffer. All in all the access time is quite good.

The surprise comes in when the List<Integer> is added to the code.  With (1), (2) and (3) enabled, a standard iteration looks like this:

*** iteration 200
array[]: populating=8.548172ms
direct buffer: absolute populating=46.513045ms
direct buffer: relative populating=47.481086ms
list: populating=90.392347ms
array[]: accessing=6.733597ms
direct buffer: absolute accessing=38.527891ms
direct buffer: relative accessing=39.543368ms
list: direct accessing=59.072768ms

Adding Integers to the List is obviously slow: 2 times more than direct buffer and 15 times slower than primitive int.  Accessing elements in the list is also slower: 1.5 times the direct buffer, and 9 times slower than the int[].

But, what is interesting is that even before the first iteration is started, a Full GC is triggered.

Here are the logs from start till end of the second (2nd) iterations:
 
     66    1             java.lang.String::hashCode (55 bytes)
     69    2             java.lang.String::charAt (29 bytes)
*** iteration 1
    103    1 %           NioTest::main @ 89 (418 bytes)
    124    1 %           NioTest::main @ -2 (418 bytes)   made not entrant
    124    2 %           NioTest::main @ 89 (418 bytes)
array[]: populating=47.584349ms
    152    3             java.nio.Buffer::checkIndex (22 bytes)
    152    4             java.nio.DirectIntBufferU::put (18 bytes)
    152    6     n       sun.misc.Unsafe::putInt (0 bytes)   
    153    5             java.nio.DirectIntBufferU::ix (10 bytes)
direct buffer: absolute populating=59.59346ms
direct buffer: relative populating=56.909727ms
    267    2 %           NioTest::main @ -2 (418 bytes)   made not entrant
    269    7             java.lang.Object:: (1 bytes)
    270    8             java.lang.Number:: (5 bytes)
    270    9             java.lang.Integer:: (10 bytes)
    270   10             java.util.ArrayList::add (29 bytes)
    271   11             java.util.ArrayList::ensureCapacityInternal (26 bytes)
    271   12             java.lang.Integer::valueOf (54 bytes)
    272    3 %           NioTest::main @ 200 (418 bytes)
[GC 64139K->55315K(251136K), 0.0236128 secs]
[GC 109374K->92973K(316672K), 0.0642352 secs]
[GC 224045K->173425K(316672K), 0.1279372 secs]
[Full GC 173425K->173131K(519872K), 3.4242516 secs]
list: populating=3821.272896ms
array[]: accessing=6.718271ms
4097   13             java.nio.DirectIntBufferU::get (16 bytes)
4098   14     n       sun.misc.Unsafe::getInt (0 bytes)   
direct buffer: absolute accessing=40.168782ms
4138   15             java.nio.DirectIntBufferU::get (15 bytes)
4138   16             java.nio.Buffer::nextGetIndex (31 bytes)
direct buffer: relative accessing=41.217827ms
4179   17             java.util.ArrayList::get (11 bytes)
4179   18             java.util.ArrayList::rangeCheck (22 bytes)
4179   19             java.util.ArrayList::elementData (7 bytes)
list: direct accessing=66.404046ms
*** iteration 2
array[]: populating=9.818703ms
direct buffer: absolute populating=47.35593ms
direct buffer: relative populating=47.253397ms
4350    4 %           java.util.ArrayList::clear @ 12 (39 bytes)
[GC 304203K->292317K(571520K), 0.4874512 secs]
[Full GC 292317K->111901K(627136K), 0.7334851 secs]
list: populating=1310.247517ms
array[]: accessing=6.688716ms
direct buffer: absolute accessing=38.789879ms
direct buffer: relative accessing=39.294516ms
list: direct accessing=60.193696ms

In that short time, there are two(2) full GCs pauses.  And one full pause just triggered by this snippet of code.

List list = new ArrayList();

Quite impressive...

Note that no other full GC will occur after those two. Some small GC collections are triggered at regular intervals, and such an iteration will look like this:
 
*** iteration 74
array[]: populating=8.292387ms
direct buffer: absolute populating=46.7203ms
direct buffer: relative populating=47.239167ms
[GC 1237773K->300333K(1635456K), 1.1714739 secs]
list: populating=1257.412101ms
array[]: accessing=6.637996ms
direct buffer: absolute accessing=38.250944ms
direct buffer: relative accessing=39.295611ms
list: direct accessing=62.972299ms

The GC kicks in when objects are added to the list.  Which sounds quite normal.  But, as a result, while populating the list is, for most iterations, within 86-90 ms, due to the GC that operation (or list of operations) can reach up to 1146.58 ms even after many iterations.

Example:
 
*** iteration 181
array[]: populating=8.487601ms
direct buffer: absolute populating=48.086431ms
direct buffer: relative populating=48.309376ms
[GC 1224941K->287437K(1632640K), 1.0567382 secs]
list: populating=1146.593101ms

It makes me think about when to use lists and Integer versus primitive types an arrays...

Tuesday, April 16, 2013

Ubuntu - Install Oracle JDK

Yes, there are also PPA, and etc...  But I wanted to write it down once for me...



Install the JDK
  • Download the 32bit or 64bit Linux "compressed binary file" - it has a ".tar.gz" file extension i.e. "[java-version]-i586.tar.gz" for 32bit and "[java-version]-x64.tar.gz" for 64bit
  • Uncompress it
    tar -xvf jdk-7u17-i586.tar.gz (32bit)
    tar -xvf jdk-7u17-linux-x64.tar.gz (64bit)
JDK 7 package is extracted into ./jdk1.7.0_17 directory. N.B. check carefully this folder name since Oracle seem to change this occasionally.
  • Now move the JDK 7 directory to /usr/lib
sudo mkdir -p /usr/lib/jvm
sudo mv ./jdk.1.7.0_17 /usr/lib/jvm/oracle_jdk7
  • Run
sudo update-alternatives --install "/usr/bin/java" "java" "/usr/lib/jvm/oracle_jdk7/bin/java" 1
sudo update-alternatives --install "/usr/bin/javac" "javac" "/usr/lib/jvm/oracle_jdk7/bin/javac" 1
sudo update-alternatives --install "/usr/bin/javaws" "javaws" "/usr/lib/jvm/oracle_jdk7/bin/javaws" 1
  • Correct the permissions of the executables:
sudo chmod a+x /usr/bin/java 
sudo chmod a+x /usr/bin/javac 
sudo chmod a+x /usr/bin/javaws
N.B. remember - Java JDK has many more executables that you can similarly install as above.javajavacjavaws are probably the most frequently required.
This answer lists the other executables available.
  • Run
sudo update-alternatives --config java
You will see output similar one below - choose the number matching oracle_jdk7 - for example 3 in this list:
$sudo update-alternatives --config java
There are 3 choices for the alternative java (providing /usr/bin/java).

Selection Path Priority Status
————————————————————
* 0 /usr/lib/jvm/java-6-openjdk/jre/bin/java 1061 auto mode
1 /usr/lib/jvm/java-6-openjdk/jre/bin/java 1061 manual mode
2 /usr/lib/jvm/java-6-sun/jre/bin/java 63 manual mode
3 /usr/lib/jvm/oracle_jdk7/jre/bin/java 3 manual mode

Press enter to keep the current choice[*], or type selection number: 3
update-alternatives: using /usr/lib/jvm/oracle_jdk7/jre/bin/java to provide /usr/bin/java (java) in manual mode.
Check the version of you new JDK 7 installation:
java -version
java version “1.7.0”
Java(TM) SE Runtime Environment (build 1.7.0_17-b02)
Java HotSpot(TM) Client VM (build 23.7-b01, mixed mode) 
Repeat the above for:
sudo update-alternatives --config javac
sudo update-alternatives --config javaws
Enable mozilla firefox plugin:
32 bit:
ln -s /usr/lib/jvm/oracle_jdk7/jre/lib/i386/libnpjp2.so ~/.mozilla/plugins/

64 bit:
ln -s /usr/lib/jvm/oracle_jdk7/jre/lib/amd64/libnpjp2.so ~/.mozilla/plugins/
N.B. you can link the plugin (libnpjp2.so) to /usr/lib/firefox/plugins/ for a system wide installation

Sunday, April 14, 2013

Software and scalability: It is all a numbers game.

A fascinating talk by Martin Thompson on making scalable software:

It's all a numbers game -- the dirty little secret of scalable systems by Martin Thompson




My highlights from the talk...

At a high level:
  • Be simple
  • Share Nothing Architecture
  • Profile the code once a week.
  • Separate reads from writes
  • Know the platform
  • Use cache oblivious algorithms
From a pattern perspective:
  • Use the "Single Writer Principle": Any item of data, or resource, is only mutated by a single writer/thread.  It is OK if multiple threads, or other execution contexts, read the same data. CPUs can broadcast read only copies of data to other cores via the cache coherency sub-system. This has a cost but it scales very well.
  • Disruptor vs queues: See the LMAX project on GitHub (http://lmax-exchange.github.io/disruptor/). Associated technical paper http://disruptor.googlecode.com/files/Disruptor-1.0.pdf.
  • The "Curse of Logging": Most common Java Logging libraries take way too many CPU cycles from the application (Log4j, JUL, Logback, etc.). 
  • Command Query Responsibility Segregation (CQRS):  Split the conceptual model into separate models for update and display.  This refers to as Command and Query respectively following the vocabulary of CommandQuerySeparation
Read Martin's blog.

Saturday, March 30, 2013

An Intro to ZeroMQ(ØMQ) On Ubuntu 12.04

What is ØMQ?


ØMQ (also spelled as ZeroMQ, 0MQ, zmq) is an open source (LGPL), brokerless high-performance asynchronous messaging library. It is main aim is for scalable distributed or concurrent applications   It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast (N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply). The library is designed to have a familiar socket-style API.  

It is easy to use. Simply reference the ØMQ library, and you can happily send messages between your applications.

It's fast (see the following post Message Queue Shootout!).  



It is available in many languages (bindings) in C++, C#, Clojure, CL, Delphi, Erlang, F#, Felix,  Go, Haskell, Haxe, Java, Lua, Node.js, Objective-C, Perl, PHP, Python, Q, Racket, Ruby, Scala, Tcl, Ada, Basic, ooc.  But written in C++.

It is brokerless i.e. ØMQ does not require any server process. In effect, your application endpoints play the server role. This makes deployment simpler, but there are no obvious place to look at when things go wrong.  A simple example of this problem is to kill the server example in the "Hello World" below while the client is running. The client will hang forever (... I only waited a few minutes...) even when the server is restarted.

It is not part of the Advanced Message Queuing Protocol (AMQP) group of Message Brokers like RabbitMQ or ActiveMQ, and in a lots of ways is a different beast from those "traditional" messaging systems.  IMO, only Kafka can be considered similar at least in terms of performance but even then the comparison is skewed as ØMQ has no guarantee of delivery (non-durable queues).


Ubuntu Installation

There is a PPA available on the web (https://launchpad.net/~chris-lea/+archive/zeromq) for 12.04 (Precise):
deb http://ppa.launchpad.net/chris-lea/zeromq/ubuntu precise main 
deb-src http://ppa.launchpad.net/chris-lea/zeromq/ubuntu precise main 

Which I did not try since I used the tar ball available on the ØMQ download page.  

To grab the release, in this case 3.2.2:

    wget http://download.zeromq.org/zeromq-3.2.2.tar.gz
    tar -zxf zeromq-3.2.2.tar.gz
    cd zeromq-3.2.2

The ØMQ build instructions informs you that the following dependencies are required libtoolautoconfautomake as well as uuid-dev, and of course build-essential.

    sudo apt-get install libtool autoconf automake uuid-dev build-essential

Build and install:

    ./configure
    make
   # To install system wide
   sudo make install
   # Run ldconfig to create teh necessary links
   sudo ldconfig

To see all configuration options, run ./configure --help. Read INSTALL for more details.

A "Hello World" example

This is well documented on the ØMQ guide. Using the C example, first, the client.c file (from the ØMQ guide):

// Hello World client
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
int main (void)
{
printf ("Connecting to hello world server…\n");
void *context = zmq_ctx_new ();
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5555");
int request_nbr;
for (request_nbr = 0; request_nbr != 10; request_nbr++) {
char buffer [10];
printf ("Sending Hello %d…\n", request_nbr);
zmq_send (requester, "Hello"50);
zmq_recv (requester, buffer, 100);
printf ("Received World %d\n", request_nbr);
}
zmq_close (requester);
zmq_ctx_destroy (context);
return 0;
}

A slightly modified server.c file that indicates the ØMQ version at startup:

// Hello World server
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
int main (void)
{
    int major, minor, patch;
zmq_version (&major, &minor, &patch);
printf ("Current 0MQ version is %d.%d.%d\n", major, minor, patch);

// Socket to talk to clients
void *context = zmq_ctx_new ();
void *responder = zmq_socket (context, ZMQ_REP);
int rc = zmq_bind (responder, "tcp://*:5555");
assert (rc == 0);

    printf("0MQ server start at %d.", 5555);
while (1) {
char buffer [10];
zmq_recv (responder, buffer, 100);
printf ("Received Hello\n");
zmq_send (responder, "World"50);
sleep (1); // Do some 'work'
}
return 0;
} 

The only thing to remember when compiling is to specify the linked library (g++/gcc linking refresher) when compiling:
  
    g++ -0 client client.c -lzmq
    g++ -0 server server.c -lzmq

Start the server, then the client:

    ./server
    ./client

The client will send 10 "Hello"s to the server that will reply with "World".

That is it.

More info: http://www.zeromq.org/. Now, read the guide

Some interesting links: 

Wednesday, March 20, 2013

Installing Ganglia 3.x on Ubuntu 12.04.1

While Ganglia is a well known monitoring solution, it was difficult to find and regroup information for a successful and clean install on a 12.04.1 Ubuntu cluster of 30 nodes.  In particular, linking the UI and Apache was a bit hackish until I found the symbolic link trick (ie the alias).

What is Ganglia?


Ganglia is BSD-licensed open-source project implementing a scalable distributed monitoring system for high-performance computing systems such as clusters and Grids. It leverages widely used technologies such as XML for data representation, XDR for compact, portable data transport, and RRD tool for data storage and visualization. It has very low per-node overheads and high concurrency. The implementation is robust, has been ported to an extensive set of operating systems and processor architectures, and is currently in use on thousands of clusters around the world.

Overview/Concepts


Major components

There are 3 main parts to Ganglia: The nodes generating the statistics, the node(s) collecting the statistics, and a web front-end for displaying them.

  • gmond: A process running on the monitored nodes. For Ubuntu, this is installed with the ganglia-monitor package:
sudo apt-get install ganglia-monitor
        Configuration in /etc/gmond.conf, and associated service is ganglia-monitor.
  • gmetad: A process running on one or multiple machines that collects the statistics sent by the various gmond processes in the associated grid. For Ubuntu, this is the package ganglia-webfrontend package:
         sudo apt-get install ganglia-webfrontend
        Configuration in /etc/gmetad.conf, and associated service is gmetad.
  • A web UI: The web front end is installed/contained within the same package as gmetad. The UI is used to display the collected data. 
One of the tricky things about Ganglia is how it connects to itself.  Any gmond processes sends its collected information towards a gmetad.  Generally, the  collector node(s) is both a node that will generate statistics and a collection point. Such a node will run the the same daemon gmond but will be tweaked to also run the gmetad daemon. In turn, the gmetad daemon provides data to the web front end.

On a multi-nodes deployment, it is possible to verify (or test) the gmond process and to receive an XML dump:
telnet localhost 8649
Note: The above example uses the default port.  This port can be changed in /etc/gmond.conf.

Installation

On each Ubuntu nodes used to collect statistics:
# Install service ganglia-monitor -> gmond
sudo apt-get install ganglia-monitor 

For the Ubuntu node that collects data and that is used to run the web UI:
# Install ganglia frontend + gmetad
sudo apt-get install ganglia-webfrontend

Note that the above is not sufficient for ganglia to be accessible via HTTP.  The Ganglia UI must be deployed within a web application server (e.g. Apache tomcat) to be accessible.  

Note:  Apache tomcat is installed as a dependency when the ganglia-frontend package is installed. It it also possible to install it separately by entering the following command:
sudo apt-get install apache2
In order to deploy and run Ganglia in Apache tomcat, it is required to copy the apache.conf file from /etc/ganglia-webfrontend/apache.conf to /etc/apache2/sites-enabled/:
sudo cp /etc/ganglia-webfrontend/apache.conf /etc/apache2/sites-enabled/ganglia.conf
Note: The file is renamed to clarify what it is used for.

The /etc/ganglia-webfrontend/apache.conf contains a simple alias for /ganglia towards /sur/share/ganglia-webfrontend.

In my opinion, it is simpler and cleaner to create a symbolic link from etc/ganglia-webfrontend/apache.conf to /etc/apache2/sites-enabled/ such as:
cd /etc/apache2/sites-enabled
sudo ln -s /etc/ganglia-webfrontend/apache.conf ganglia.conf
sudo service apache2 stop
sudo service apache2 start

With the above, and the Apache tomcat server running, the Ganglia UI is available at: <host ip>/ganglia.

Stop/start

Uing Ubuntu services:
sudo service gmetad start
sudo service ganglia-monitor start

Monitoring hadoop 1.0.4

Ganglia can be used to monitor a Hadoop cluster.

In file  $HADOOP_HOME/conf/hadoop-metrics2.properties, enable the various sink properties referring to ganglia (remove '#'). Near the end of the file, when specifying the addresses to sink to, use the same address as the mcast_join value found in /etc/gmond.conf, i.e if using multicast:

namenode.sink.ganglia.servers=239.2.11.71:8649
datanode.sink.ganglia.servers=239.2.11.71:8649
tasktracker.sink.ganglia.servers=239.2.11.71:8649
...

Monitoring HBase 0.94.5

Ganglia can also be used to monitor an HBase cluster.

In the file $HBASE_HOME/conf/hadoop-metrics.properties, enable the various configuration options for ganglia.

In the example below, ganglia 3.1.7 is used, and ganglia collects data using multicast (mcast_join value set to =239.2.11.71):

    hbase.class=org.apache.hadoop.metrics.ganglia.GangliaContext31
  hbase.period=10
  hbase.servers=239.2.11.71:8649
  jvm.class=org.apache.hadoop.metrics.ganglia.GangliaContext31
  jvm.period=10
  jvm.servers=239.2.11.71:8649

References