Thorough Introduction to Apache Kafka

Introduction

Kafka is a word that gets heard a lot nowadays… A lot of leading digital companies seem to use it as well. But what is it actually?

Kafka was originally developed at LinkedIn in 2011 and has improved a lot since then. Nowadays it is a whole platform, allowing you to redundantly store absurd amounts of data, have a message bus with huge throughput (millions/sec) and use real-time stream processing on the data that goes through it all at once.

This is all well and great, but stripped down to its core, Kafka is a distributed, horizontally-scalable, fault-tolerant, commit log.

Those were some fancy words, let’s go at them one by one and see what they mean. Afterwards, we will dive deep into how it works.

Distributed

A distributed system is one which is split into multiple running machines, all of which work together in a cluster to appear as one single node to the end user. Kafka is distributed in the sense that it stores, receives and sends messages on different nodes (called brokers).

The benefits to this approach are high scalability and fault-tolerance.

Horizontally-scalable

Let’s define the term vertical scalability first. Say, for instance, you have a traditional database server which is starting to get overloaded. The way to get this solved is to simply increase the resources (CPU, RAM, SSD) on the server. This is called vertical scaling — where you add more resources to the machine. There are two big disadvantages to scaling upwards:

  1. There are limits defined by the hardware. You cannot scale upwards indefinitely.
  2. It usually requires downtime, something which big corporations cannot afford.

Horizontal scalability is solving the same problem by throwing more machines at it. Adding a new machine does not require downtime nor are there any limits to the amount of machines you can have in your cluster. The catch is that not all systems support horizontal scalability, as they are not designed to work in a cluster and those that are are usually more complex to work with.

Horizontal scaling becomes much cheaper after a certain threshold

Fault-tolerant

Something that emerges in non-distributed systems is that they have a single point of failure (SPoF). If your single database server fails (as machines do) for whatever reason, you’re screwed.

Distributed systems are designed in such a way to accommodate failures in a configurable way. In a 5-node Kafka cluster, you can have it continue working even if 2 of the nodes are down. It is worth noting that fault-tolerance is at a direct tradeoff with performance, as in the more fault-tolerant your system is, the less performant it is.

Commit Log

A commit log (also referred to as write-ahead log, transaction log) is a persistent ordered data structure which only supports appends. You cannot modify nor delete records from it. It is read from left to right and guarantees item ordering.

Sample illustration of a commit log, taken from here

– Are you telling me that Kafka is such a simple data structure?

In many ways, yes. This structure is at the heart of Kafka and is invaluable, as it provides ordering, which in turn provides deterministic processing. Both of which are non-trivial problems in distributed systems.

Kafka actually stores all of its messages to disk (more on that later) and having them ordered in the structure lets it take advantage of sequential disk reads.

  • Reads and writes are a constant time O(1) (knowing the record ID), which compared to other structure’s O(log N) operations on disk is a huge advantage, as each disk seek is expensive.
  • Reads and writes do not affect another. Writing would not lock reading and vice-versa (as opposed to balanced trees)

These two points have huge performance benefits, since the data size is completely decoupled from performance. Kafka has the same performance whether you have 100KB or 100TB of data on your server.

How does it work?

Applications (producers) send messages (records) to a Kafka node (broker) and said messages are processed by other applications called consumers. Said messages get stored in a topic and consumers subscribe to the topic to receive new messages.

As topics can get quite big, they get split into partitions of a smaller size for better performance and scalability. (ex: say you were storing user login requests, you could split them by the first character of the user’s username)
Kafka guarantees that all messages inside a partition are ordered in the sequence they came in. The way you distinct a specific message is through its offset, which you could look at as a normal array index, a sequence number which is incremented for each new message in a partition.

Kafka follows the principle of a dumb broker and smart consumer. This means that Kafka does not keep track of what records are read by the consumer and delete them but rather stores them a set amount of time (e.g one day) or until some size threshold is met. Consumers themselves poll Kafka for new messages and say what records they want to read. This allows them to increment/decrement the offset they’re at as they wish, thus being able to replay and reprocess events.

It is worth noting that consumers are actually consumer groups which have one or more consumer processes inside. In order to avoid two processes reading the same message twice, each partition is tied to only one consumer process per group.

Representation of the data flow

Persistence to Disk

As I mentioned earlier, Kafka actually stores all of its records to disk and does not keep anything in RAM. You might be wondering how this is in the slightest way a sane choice. There are numerous optimizations behind this that make it feasible:

  1. Kafka has a protocol which groups messages together. This allows network requests to group messages together and reduce network overhead, the server in turn persist chunk of messages in one go and consumer fetch large linear chunks at once
  2. Linear reads/writes on a disk are fast. The concept that modern disks are slow is because of disk seek, something that is not an issue in big linear operations.
  3. Said linear operations are heavily optimized by the OS, via read-ahead(prefetch large block multiples) and write-behind (group small logical writes into big physical writes) techniques.
  4. Modern OSes cache the disk in free RAM. This is called pagecache.
  5. Since Kafka stores messages in a standardized binary format unmodified throughout the whole flow (producer->broker->consumer), it can make use of the zero-copy optimization. That is when the OS copies data from the pagecache directly to a socket, effectively bypassing the Kafka broker application entirely

All of these optimizations allow Kafka to deliver messages at near network speed.

Data Distribution & Replication

Let’s talk about how Kafka achieves fault-tolerance and how it distributes data between nodes.

Data Replication

Partition data is replicated across multiple brokers in order to preserve the data in case one broker dies.

At all times, one broker “owns” a partition and is the node through which applications write/read from the partition. This is called a partition leader. It replicates the data it receives to N other brokers, called followers. They store the data as well and are ready to be elected as leader in case the leader node dies.

This helps you configure the guarantee that any successfully published message will not be lost. Having the option to change the replication factor lets you trade performance for stronger durability guarantees, depending on the criticality of the data.

 
4 Kafka brokers with a replication factor of 3

In this way, if one leader ever fails, a follower can take his place.

You may be asking, though:

– How does a producer/consumer know who the leader of a partition is?

For a producer/consumer to write/read from a partition, they need to know its leader, right? This information needs to be available from somewhere.
Kafka stores such metadata in a service called Zookeeper.

What is Zookeeper?

Zookeeper is a distributed key-value store. It is highly-optimized for reads but writes are slower. It is most commonly used to store metadata and handle the mechanics of clustering (heartbeats, distributing updates/configurations, etc).

It allows clients of the service (the Kafka brokers) to subscribe and have changes sent to them once they happen. This is how brokers know when to switch partition leaders. Zookeeper is also extremely fault-tolerant and it ought to be, as Kafka heavily depends on it.

It is used for storing all sort of metadata, to mention some:

  • Consumer group‘s offset per partition (although modern clients store offsets in a separate Kafka topic)
  • ACL (Access Control Lists) — used for limiting access/authorization
  • Producer & Consumer Quotas —maximum message/sec boundaries
  • Partition Leaders and their health

How does a producer/consumer know who the leader of a partition is?

Producer and Consumers used to directly connect and talk to Zookeeper to get this (and other) information. Kafka has been moving away from this coupling and since versions 0.8 and 0.9 respectively, clients fetch metadata information from Kafka brokers directly, who themselves talk to Zookeeper.

 
Metadata Flow

Streaming

In Kafka, a stream processor is anything that takes continual streams of data from input topics, performs some processing on this input and produces a stream of data to output topics (or external services, databases, the trash bin, wherever really…)

It is possible to do simple processing directly with the producer/consumer APIs, however for more complex transformations like joining streams together, Kafka provides a integrated Streams API library.

This API is intended to be used within your own codebase, it is not running on a broker. It works similar to the consumer API and helps you scale out the stream processing work over multiple applications (similar to consumer groups).

Stateless Processing

A stateless processing of a stream is deterministic processing that does not depend on anything external. You know that for any given data you will always produce the same output independent of anything else. An example for that would be simple data transformation — appending something to a string "Hello" -> "Hello, World!".

 

Stream-Table Duality

It is important to recognize that streams and tables are essentially the same. A stream can be interpreted as a table and a table can be interpreted as a stream.

Stream as a Table

If you look at how synchronous database replication is achieved, you’ll see that it is through the so-called streaming replication, where each change in a table is sent to a replica server. A Kafka stream can be interpreted in the same way — as a stream of updates for data, in which the aggregate is the final result of the table. Such streams get saved in a local RocksDB (by default) and are called a KTable.

 
Each record increments the aggregated count

Table as a Stream

A table can be looked at as a snapshot of the latest value for each key in a stream. In the same way stream records can produce a table, table updates can produce a changelog stream.

 
Each update produces a snapshot record in the stream

Stateful Processing

Some simple operations like map() or filter() are stateless and do not require you to keep any data regarding the processing. However, in real life, most operations you’ll do will be stateful (e.g count()) and as such will require you to store the currently accumulated state.

The problem with maintaining state on stream processors is that the stream processors can fail! Where would you need to keep this state in order to be fault-tolerant?

A naive approach is to simply store all state in a remote database and join over the network to that store. The problem with this is that there is no locality of data and lots of network round-trips, both of which will significantly slow down your application. A more subtle but important problem is that your stream processing job’s uptime would be tightly coupled to the remote database and the job will not be self-contained (a change in the database from another team might break your processing).

So what’s a better approach?
Recall the duality of tables and streams. This allows us to convert streams into tables that are co-located with our processing. It also provides us with a mechanism for handling fault tolerance — by storing the streams in a Kafka broker.

A stream processor can keep its state in a local table (e.g RocksDB), which will be updated from an input stream (after perhaps some arbitrary transformation). When the process fails, it can restore its data by replaying the stream.

You could even have a remote database be the producer of the stream, effectively broadcasting a changelog with which you rebuild the table locally.

Stateful processing, joining a KStream with a KTable

KSQL

Normally, you’d be forced to write your stream processing in a JVM language, as that is where the only official Kafka Streams API client is.

 
Sample KSQL setup

Currently in a developer preview, KSQL is a new feature which allows you to write your simple streaming jobs in a familiar SQL-like language.

You set up a KSQL server and interactively query it through a CLI to manage the processing. It works with the same abstractions (KStream & KTable), guarantees the same benefits of the Streams API (scalability, fault-tolerance) and greatly simplifies work with streams.

This might not sound as a lot but in practice is way more useful for testing out stuff and even allows people outside of development (e.g product owners) to play around with stream processing. I encourage you to take a look at the quick-start video and see how simple it is.

Streaming alternatives

Kafka streams is a perfect mix of power and simplicity. It arguably has the best capabilities for stream jobs on the market and it integrates with Kafka way easier than other stream processing alternatives (StormSamzaSpark,Wallaroo).

The problem with most other stream processing frameworks is that they are complex to work with and deploy. A batch processing framework like Spark needs to:

  • Control a large number of jobs over a pool of machines and efficiently distribute them across the cluster.
  • To achieve this it has to dynamically package up your code and physically deploy it to the nodes that will execute it. (along with configuration, libraries, etc.)

Unfortunately tackling these problems makes the frameworks pretty invasive. They want to control many aspects of how code is deployed, configured, monitored, and packaged.

Kafka Streams let you roll out your own deployment strategy when you need it, be it KubernetesMesosNomadDocker Swarm or others.

The underlying motivation of Kafka Streams is to enable all your applications to do stream processing without the operational complexity of running and maintaining yet another cluster. The only potential downside is that it is tightly coupled with Kafka, but in the modern world where most if not all real-time processing is powered by Kafka that may not be a big disadvantage.


When would you use Kafka?

As we already covered, Kafka allows you to have a huge amount of messages go through a centralized medium and store them without worrying about things like performance or data loss.

This means it is perfect for use as the heart of your system’s architecture, acting as a centralized medium that connects different applications. Kafka can be the center piece of an event-driven architecture and allows you to truly decouple applications from one another.

 

Kafka allows you to easily decouple communication between different (micro)services. With the Streams API, it is now easier than ever to write business logic which enriches Kafka topic data for service consumption. The possibilities are huge and I urge you to explore how companies are using Kafka.

Summary

Apache Kafka is a distributed streaming platform capable of handling trillions of events a day. Kafka provides low-latency, high-throughput, fault-tolerant publish and subscribe pipelines and is able to process streams of events.

We went over its basic semantics (producer, broker, consumer, topic), learned about some of its optimizations (pagecache), learned how it’s fault-tolerant by replicating data and were introduced to its powerful streaming abilities.

Kafka has seen large adoption at thousands of companies worldwide, including a third of the Fortune 500. With the continual improvement of Kafka and the recently released first major version 1.0 (1st November, 2017),there are predictions that this Streaming Platform is going to be as big and central of a data platform as relational databases are.

I hope that this introduction helped familiarize you with Apache Kafka and its potential.

Further Reading Resources & Things I did not mention

The rabbit hole goes deeper than this article was able to cover. Here are some features I did not get the chance to mention but are nevertheless important to know:

Connector API — API helping you connect various services to Kafka as a source or sink (PostgreSQL, Redis, ElasticSearch)

Log Compaction — An optimization which reduces log size. Extremely useful in changelog streams

Exactly-once Message Semantics — Guarantee that messages are received exactly once. This is a big deal.

Resources

Confluent Blog — a wealth of information regarding Apache Kafka

Kafka Documentation — Great, extensive, high-quality documentation

Kafka Summit 2017 videos

Thank you for taking the time to read this.

APACHE HADOOP YARN – RESOURCEMANAGER

APACHE HADOOP YARN – RESOURCEMANAGER

As previously described, ResourceManager (RM) is the master that arbitrates all the available cluster resources and thus helps manage the distributed applications running on the YARN system. It works together with the per-node NodeManagers (NMs) and the per-application ApplicationMasters (AMs).

  1. NodeManagers take instructions from the ResourceManager and manage resources available on a single node.
  2. ApplicationMasters are responsible for negotiating resources with the ResourceManager and for working with the NodeManagers to start the containers.

Diagram of resource manager components

RESOURCEMANAGER COMPONENTS

The ResourceManager has the following components (see the figure above):

  1. Components interfacing RM to the clients:
    • ClientService: The client interface to the Resource Manager. This component handles all the RPC interfaces to the RM from the clients including operations like application submission, application termination, obtaining queue information, cluster statistics etc.
    • AdminService: To make sure that admin requests don’t get starved due to the normal users’ requests and to give the operators’ commands the higher priority, all the admin operations like refreshing node-list, the queues’ configuration etc. are served via this separate interface.
  2. Components connecting RM to the nodes:
    • ResourceTrackerService: This is the component that responds to RPCs from all the nodes. It is responsible for registration of new nodes, rejecting requests from any invalid/decommissioned nodes, obtain node-heartbeats and forward them over to the YarnScheduler. It works closely with NMLivelinessMonitor and NodesListManager described below.
    • NMLivelinessMonitor: To keep track of live nodes and specifically note down the dead nodes, this component keeps track of each node’s its last heartbeat time. Any node that doesn’t heartbeat within a configured interval of time, by default 10 minutes, is deemed dead and is expired by the RM. All the containers currently running on an expired node are marked as dead and no new containers are scheduling on such node.
    • NodesListManager: A collection of valid and excluded nodes. Responsible for reading the host configuration files specified via yarn.resourcemanager.nodes.include-path and yarn.resourcemanager.nodes.exclude-path and seeding the initial list of nodes based on those files. Also keeps track of nodes that are decommissioned as time progresses.
  3. Components interacting with the per-application AMs:
    • ApplicationMasterService: This is the component that responds to RPCs from all the AMs. It is responsible for registration of new AMs, termination/unregister-requests from any finishing AMs, obtaining container-allocation & deallocation requests from all running AMs and forward them over to the YarnScheduler. This works closely with AMLivelinessMonitor described below.
    • AMLivelinessMonitor: To help manage the list of live AMs and dead/non-responding AMs, this component keeps track of each AM and its last heartbeat time. Any AM that doesn’t heartbeat within a configured interval of time, by default 10 minutes, is deemed dead and is expired by the RM. All the containers currently running/allocated to an AM that gets expired are marked as dead. RM schedules the same AM to run on a new container, allowing up to a maximum of 4 such attempts by default.
  4. The core of the ResourceManager – the scheduler and related components:
    • ApplicationsManager: Responsible for maintaining a collection of submitted applications. Also keeps a cache of completed applications so as to serve users’ requests via web UI or command line long after the applications in question finished.
    • ApplicationACLsManager: RM needs to gate the user facing APIs like the client and admin requests to be accessible only to authorized users. This component maintains the ACLs lists per application and enforces them whenever an request like killing an application, viewing an application status is received.
    • ApplicationMasterLauncher: Maintains a thread-pool to launch AMs of newly submitted applications as well as applications whose previous AM attempts exited due to some reason. Also responsible for cleaning up the AM when an application has finished normally or forcefully terminated.
    • YarnScheduler: The Scheduler is responsible for allocating resources to the various running applications subject to constraints of capacities, queues etc. It performs its scheduling function based on the resource requirements of the applications such as memory, CPU, disk, network etc. Currently, only memory is supported and support for CPU is close to completion.
    • ContainerAllocationExpirer: This component is in charge of ensuring that all allocated containers are used by AMs and subsequently launched on the correspond NMs. AMs run as untrusted user code and can potentially hold on to allocations without using them, and as such can cause cluster under-utilization. To address this, ContainerAllocationExpirer maintains the list of allocated containers that are still not used on the corresponding NMs. For any container, if the corresponding NM doesn’t report to the RM that the container has started running within a configured interval of time, by default 10 minutes, the container is deemed as dead and is expired by the RM.
  5. TokenSecretManagers (for security):ResourceManager has a collection of SecretManagers which are charged with managing tokens, secret-keys that are used to authenticate/authorize requests on various RPC interfaces. A future post on YARN security will cover a more detailed descriptions of the tokens, secret-keys and the secret-managers but a brief summary follows:
    • ApplicationTokenSecretManager: To avoid arbitrary processes from sending RM scheduling requests, RM uses the per-application tokens called ApplicationTokens. This component saves each token locally in memory till application finishes and uses it to authenticate any request coming from a valid AM process.
    • ContainerTokenSecretManager: SecretManager for ContainerTokens that are special tokens issued by RM to an AM for a container on a specific node. ContainerTokens are used by AMs to create a connection to the corresponding NM where the container is allocated. This component is RM-specific, keeps track of the underlying master and secret-keys and rolls the keys every so often.
    • RMDelegationTokenSecretManager: A ResourceManager specific delegation-token secret-manager. It is responsible for generating delegation tokens to clients which can be passed on to unauthenticated processes that wish to be able to talk to RM.
  6. DelegationTokenRenewer: In secure mode, RM is Kerberos authenticated and so provides the service of renewing file-system tokens on behalf of the applications. This component renews tokens of submitted applications as long as the application runs and till the tokens can no longer be renewed.

CONCLUSION

In YARN, the ResourceManager is primarily limited to scheduling i.e. only arbitrating available resources in the system among the competing applications and not concerning itself with per-application state management. Because of this clear separation of responsibilities coupled with the modularity described above, and with the powerful scheduler API discussed in the previous post, RM is able to address the most important design requirements – scalability, support for alternate programming paradigms.

To allow for different policy constraints, the scheduler described above in the RM is pluggable and allows for different algorithms. In a future post of this series, we will dig deeper into various features of CapacityScheduler that schedules containers based on capacity guarantees and queues.

Top Differences between Hadoop1.0 & Hadoop 2.0

Differences-between-Hadoop1.0-&-Hadoop-2.0

Early adopters of the Hadoop ecosystem were restricted to processing models that were MapReduce-based only. Hadoop 2 has brought with it effective processing models that lend themselves to many Big Data uses, including interactive SQL queries over big data, analysis of Big Data scale graphs, and scalable machine learning abilities. The evolution of Hadoop 1’s limited processing model comprising of various batch-oriented MapReduce tasks, to the more specialized and interactive hard-core models of Hadoop 2 ,have now showcased the potential value contributed by distributed and large scale processing systems. Read on to note the major differences that exist between Hadoop 1 and 2. 

Hadoop–YARN and HDFS 
While other available solutions are likely to be unsuitable for interactive analytics; are I/O intensive; constrained with respect to providing graph support, memory intensive algorithms, and other machine learning processes; and more; Hadoop proves to be far ahead in the race. Creating a reliable, scalable and strong foundation for Big Data architectures, the Hadoop ecosystem has been positioned as one of the most dominant Big Data platforms for analytics. Here, it deserves mention that Hadoop developers had rewritten major components of the Hadoop 1 file system for producing Hadoop 2. The resource manager YARN and HDFS federation were introduced as important advances for Hadoop 2.

HDFS– Hadoop file system with a difference

HDFS, a popular Hadoop file system, comprises of two main components: blocks storage service and namespaces. While the block storage service deals with block operations, cluster management of data nodes, and replication; namespaces manage all operations on files/ directories, especially with regards to the creation and modification of files and directories.

A single Namenode was responsible for managing the complete namespace for Hadoop clusters in Hadoop 1. With the advent of the HDFS federation, several Namenode servers are being used for the management of namespaces. This in turn allows for performance improvements, horizontal scaling, and multiple namespaces. All in all, the implementation of HDFS makes existing Namenode configurations operate without changes. A shift to the HDFS federation requires Hadoop administrators to format Namenodes, and update the same for  use with latest Hadoop cluster applications. It also involves the addition of more Namenodes to the Hadoop cluster.

YARN—Supports additional performance enhancements for Hadoop 2

While the HDFS federation is responsible for bringing in measures of reliability and scalability to Hadoop, YARN brings about significant performance enhancements for certain applications; implements an overall more flexible execution engine; and offers support for additional processing models. As a recap, do know that YARN, a resource manager, was developed as a result of the separation of the resource management capabilities and processing engine of MapReduce; as implemented in Hadoop 1.

Oft referred to as the operating system of Hadoop due to its role in managing and monitoring diverse workloads, implementing security controls, maintaining multi-tenant environs, and managing all high availability Hadoop features, YARN is designed for diverse, multiple, user applications that operate on a given multi-tenant platform. In addition to MapReduce, YARN supports other multiple processing models too.

High Availability Mode (HA) of Namenode

The name node stores all metadata in the Hadoop Cluster. It’s extremely important because in case of events such as an unprecedented machine crash, it can bring down the entire Hadoop cluster. Hadoop 2.0 offers a solution for the problem on hand. Now, the High Availability feature of HDFS comes to the rescue by allowing any of the two redundant name nodes to run in the same cluster. These name nodes may run in any given active/passive way—with one operating as the primary name node, and the other as a hot standby one.

Both these name nodes share an edits log, wherein all changes are collected in shared NFS storage. At any point of time, only a single writer is allowed to access this shared storage. Here, the passive name node is also allowed access to the storage and is responsible for keeping all updated metadata information with respect to the cluster. If an active name node fails to function, the passive name node takes over as the active one and starts writing onto the shared storage.

Enhanced Utilization of Resources

In case of Hadoop 1.0, the JobTracker held the dual responsibility of driving the accurate  execution of MapReduce jobs, and also managing the resources dedicated to the cluster. With YARN coming to the scene, two major functionalities attributed to the overburdened JobTracker– job scheduling/monitoring and resource management, are split up into separate daemons. These are:

Resource Manager (RM) that lays focus upon the management of cluster resources;

An Application Master (AM), which is typically a one-per-running-application that manages individual running applications; for instance, MapReduce jobs. 

It is essential to note that there exists no more non-flexible map-reduce slots. With YARN as the central resource manager, multiple applications  can now share a common resource and run on Hadoop.

Batch Oriented application

In its 2.0 version, Hadoop goes much beyond its batch oriented nature and runs interactive applications, along with streaming them too.

Native Windows Support

Originally, Hadoop was developed for supporting the UNIX family that was linked with operating systems. With Hadoop 2.0 that offers native support for the Windows operating system, the reach of Hadoop has extended significantly. It now caters to the ever-growing Windows Server market with flair.

Non MapReduce Applications on Hadoop 2.0

Hadoop 1.0 was compatible with MapReduce framework tasks only; they could process all data stored in HDFS. Other than MapReduce, there were no more models for data processing. For things such as graph or real-time analysis of the data stored in HDFS, users had to shift the data to other alternate storage facilities like HBase. YARN helps Hadoop run non-MapReduce applications too. YARN APIs can be used for writing on other frameworks and running on top of HDFS. This helps the running of different non-MapReduce applications on Hadoop—with MPI, Giraph, Spark, and HAMA being some applications that are well-ported for running within YARN.

Data node caching for faster access

Hadoop 2.0 users and applications of the likes of Pig, Hive, or HBase are capable of identifying different sets of files that require caching. For instance, the dimension tables related to Hive can now be configured for data caches linked to the DataNode RAM; thereby allowing faster reads for Hive related queries to most frequently looked up tables.

HDFS- Multiple Storage

Another important difference between Hadoop 1.0 vs. Hadoop 2.0 is the latter’s support for all kinds of heterogeneous storage. Whether it’s about SSDs or spinning disks, Hadoop 1.0 is known to treat all storage devices as a single uniform pool on a DataNode. So, while Hadoop 1.0 users could store their data on an SSD, they were in no position to control the same. Heterogeneous storage serves to be an integral part of Hadoop’s version of 2.0 and onwards. The approach is quite general and permits users to treat memory as storage tiers for temporary and cached data.

HDFS Snapshots

Hadoop 2.0 offers additional support and compatibility for file system snapshots. They are point-in-time images of complete file system or the sub trees of a specific file system. The many uses of snapshots include:

Protection for user errors: An admin-driven process can be set up for taking snapshots periodically. So, if users happen to delete files accidentally, the lost data is capable of being restored from the snapshots containing the same.

Reliable backups: Snapshots of entire file systems or sub-trees in the file system can be used by the admin as a beginning point for full backups. There’s a scope of taking incremental backups by copying down the differences between any two given snapshots.

Disaster recovery: Snapshots may also be used for the copying of point-in-time images to remotely placed sites for disaster recovery.

Hive vs.HBase–Different Technologies that work Better Together

HBase and Hive are two hadoop based big data technologies that serve different purposes. For instance, when you login to Facebook, you see multiple things like your friend list, you news feed, friend suggestions, people who liked your statuses, etc. With 1.79 billion monthly active users on Facebook and the profile page loading at lightning fast speed, can you think of a single big data technology like Hadoop or Hive or HBase doing all this at the backend? All these technologies work together to render an awesome experience for all Facebook users. The complexity of big data systems requires that every technology needs to be used in conjunction with the other.

Difference between Hive and Hbase

Let’s consider the friend recommendations feature on Facebook, it is something that does not change every second or minute. Thus, recommendations can be pre-computed for all Facebook users. However, high throughput is required to pre-compute friend recommendations but latency is just fine. This is when Hadoop MapReduce or HIVE is helpful. Your Facebook profile data or news feed is something that keeps changing and there is need for a NoSQL database faster than the traditional RDBMS’s. HBase plays a critical role of that database. In this case, the analytical use case can be accomplished using apache hive and results of analytics need to be stored in HBase for random access.

Hive and HBase are both data stores for storing unstructured data. HBase is a NoSQL database used for real-time data streaming whereas Hive is not ideally a database but a mapreduce based SQL engine that runs on top of hadoop. Ideally comparing Hive vs. HBase might not be right because HBase is a database and Hive is a SQL engine for batch processing of big data. Instead of understanding Hive vs. HBase- what is the difference between Hive and HBase, let’s try to understand what hive and HBase do and when and how to use Hive and HBase together to build fault tolerant big data applications.

Apache Hive

Hive is a SQL engine on top of hadoop designed for SQL savvy people to run mapreduce jobs through SQL like queries. Hive allows developers to impose a logical relational schema on various file formats and physical storage mechanisms within or outside the hadoop cluster. SQL like queries are run against those schemas as Hadoop MapReduce jobs. With limited write capabilities and interactivity, Hive is meant for the execution of batch transformations and large analytical queries.

When to use Hive

RDBMS professionals love apache hive as they can simply map HDFS files to Hive tables and query the data. Even the HBase tables can be mapped and Hive can be used to operate on that data. Apache Hive should be used for data warehousing requirements and when the programmers do not want to write complex mapreduce code. However, all problems can be solved using apache hive. For big data applications that require complex and fine grained processing, Hadoop MapReduce is the best choice.

Companies Using Apache Hive – Hive Use Cases

Apache Hive has approximately 0.3% of the market share i.e. 1902 companies are already using Apache Hive in production.

  • Scribd uses Hive for ad-hoc querying, data mining and for user facing analytics.
  • Hive is an integral part of the Hadoop pipeline at Hubspot for near real-time web analytics.
  • Chitika, the popular online advertising network uses Hive for data mining and analysis of its 435 million global user base.

HBase – The NoSQL Hadoop Database

Apache Hadoop does not provide random access capabilities and this is when the Hadoop database HBase comes to the rescue. HBase is high scalable (scales horizontally using off the shelf region servers), highly available, consistent and low latency NoSQL database.  With flexible data models, cost effectiveness and no Sharding (automatic Sharding), HBase works well with sparse data. Before choosing HBase for your applications, do ask these questions –

  • Do you have sufficient hardware?
  • Does your applications require those additional features that RDBMS does not provide?
  • Do you have enough data?

When to use HBase

Apache Hadoop is not a perfect big data framework for real-time analytics and this is when HBase can be used i.e.  For real-time querying of data. HBase is an ideal big data solution if the application requires random read or random write operations or both. If the application requires to access some data in real-time then it can be stored in a NoSQL database. HBase has its own set of wonderful API’s that can be used to pull or push data. HBase can also be integrated perfectly with Hadoop MapReduce for bulk operations like analytics, indexing, etc. The best way to use HBase is to make Hadoop the repository for static data and HBase the data store for data that is going to change in real-time after some processing.

HBase should be used when –

  • There is large amount of data.
  • ACID properties are not mandatory but just required.
  • Data model schema is sparse.
  • When your applications needs to scale gracefully.

Companies Using HBase – HBase Use Cases

In the big data category, HBase has a market share of about 9/1% i.e. approximately 6190 companies use HBase. Companies use HBase for time series analysis or for click stream data storage and analysis.

  • Original HBase use case was at Google which wanted to store massive databases for the internet and its users.
  • Facebook uses HBase for real-time analytics, counting Facebook likes and for messaging.
  • FINRA Financial Industry Regulatory Authority uses HBase to store all the trading graphs.
  • Pinterest uses HBase to store the graph data.
  • Flipboard uses HBase to personalize the content feed for its users.

Hive vs. HBase – Difference between Hive and HBase

  • Hive is query engine that whereas HBase is a data storage particularly for unstructured data.
  • Apache Hive is mainly used for batch processing i.e. OLAP but HBase is extensively used for transactional processing wherein the response time of the query is not highly interactive i.e. OLTP.
  • Unlike Hive, operations in HBase are run in real-time on the database instead of transforming into mapreduce jobs.
  • HBase is to real-time querying and Hive is to analytical queries.

Hive and HBase –Better Together

Hive has some limitations of high latency and HBase does not have analytical capabilities, integrating the two technologies together is the best solution. Often, people working with big data have this question in mind on –“How to use HBase from Hive? How well does using hive and HBase together work and what is the best way to use them?

Commonly HBase and Hive are used together on the same Hadoop cluster. Hive can be used as an ETL tool for batch inserts into HBase or to execute queries that join data present in HBase tables with the data present in HDFS files or in external data stores.

It is possible to write HiveQL queries over HBase tables so that HBase can make the best use of Hive’s grammar and parser, query execution engine, query planner, etc. Apache Hive has an additional library for interacting with HBase where the middle layer between Hive and HBase is implemented. When accessing HBase from Hive queries, there is a primary interface called HBaseStorageHandler that needs to be implemented. The application can also interact with HBase tables directly through input and output format but the handler is easy to implement and works well with most of the use cases. The interface between Hive and HBase is still in its maturing phase but has a great potential. The only issue integrating hive with HBase is the impedance mismatch between HBase’s sparse and untyped schema over Hive’s dense and typed schema.

10 Big Differences between Hadoop 1 & Hadoop 2

Hadoop – the solution for deciphering the avalanche of Big Data – has come a long way from the time Google published its paper on Google File System in 2003 and MapReduce in 2004. It created waves with its scale-out and not a scale-up strategy. Inroads from Doug Cutting and the team at Yahoo and Apache Hadoop project resulted in popularizing MapReduce programming – which is intensive in I/O and is constrained in interactive analysis and graphics support. This paved the way for further evolving of Hadoop 1 to Hadoop 2. The following table describes the major differences between them:

Continue reading