The Art of Spark Kernel Design

Oliver Hu
9 min readMar 24, 2019

--

Reading Notes Chapter 6–10

Spark Version 2.1; Scala version 2.11

Image result for spark logo

Chapter 6 Storage System

Overview

Spark storage system is composed of BlockManager s in Driver and Executor instances. BlockManager depends on the following components:

  • BlockManagerMaster: BlockManager’s delegate to talk to the BlockManagerMasterEndpoint in Driver.
  • BlockManagerMasterEndpoint: created by SparkEnv and registered in Driver’s RpcEnv. This only exists in Driver’s SparkEnv.
  • BlockManagerSlaveEndpoint: exists in both Executor & Driver. Receive commands from BlockManagerMasterEndpoint.
  • SerializerManager.
  • MemoryManager: responsible for allocation and recycling of memory in single node.
  • MapOutputTracker: track map output.
  • ShuffleManager: manage Shuffle.
  • BlockTransferService: serve block transfer.
  • ShuffleClient: client of Shuffle.
  • SecurityManager, DiskBlockerManager, BlockInfoManager, MemoryStore, DiskStore.

Concepts

BlockManagerId

Unique for each BlockManager. It contains:

  • host_, port_, executorId_, topologyInfo_, executorId, hostPort, host, port, topologyInfo, isDriver
  • writeExternal: serialize BlockerManagerId and write to external.
  • readExternal: real external BlockManagerId.

BlockId

Unique for each Block, it contains:

  • name: unique global name
  • isRDD: if the BlockId is RDDBlockId
  • asRDDId: convert BlockId into RDDBlockId.
  • isShuffle: if current BlockId is ShuffleBlockId.
  • isBroadCast: if current BlockId is BroadcastBlockId.

StorageLevel

  • useDisk: if block can be written to disk
  • useMemory: if can be written to memory
  • useOffHeap: if use offHeap.
  • deserialized: if needed deserialization.
  • replication.
  • memoryMode: ON_HEAP | OFF_HEAP
  • clone: make a clone
  • isValid: if is valid.

BlockInfo

Metadata of a block.

  • level: StorageLevel level
  • classTag: Block type.
  • tellMaster: if this block should be announced to master
  • size: size of block
  • readerCount: fixed reading count.
  • writeTask: store task attempt id.

BlockResult

  • storageLevel: level in StorageLevel
  • memSize: size in memory
  • diskSize: size in disk
  • isCached: if in storage system.

BlockInfoManager

BlockInfoManager primarily manages the locking resources of Block.

Block Lock
BlockInfoManager uses share lock (read) and mutual exclusive lock (write).

  • infos: mapping between BlockId and BlockInfo
  • writeLocksByTask: mapping between TaskAttemptId and Block.
  • readLocksByTask: mapping between TaskAttemptId and Block.

DiskBlockManager

DiskBlockManager is responsible for managing the Block and its location in disk.

  • conf: SparkConf
  • deleteFilesOnStop: if delete local directory on stopping DiskBlockManager.
  • localDirs: array of local directories.

Structure:

------
LocalDir [Blockmgr-UUID-0]
------
| |
SubDir [0] [1]------- |
Blocks [Block A]

DiskStore

DiskStore is responsible for writing Blocks to disk.

  • conf: SparkConf
  • diskManager: DiskBlockManager
  • minMemoryMapBytes: read directly or use FileChannel.

MemoryManager

A key difference between Hadoop & Spark is Spark’s usage of memory.

MemoryPool

Spark uses both on heap memory and off heap memory.

  • lock: lock instance
  • poolSize: size of the pool (in bytes)
  • memoryUsed: used memory.
  • memoryFree: available memory
  • incrementPoolSize: increase size of memory pool
  • decrementPoolSize: decrease size of memory pool

StorageMemoryPool

An impl of MemoryPool

  • acquireMemory(blockId: BlockId, numBytes: Long) get numBytes for a the Block in BlockId.
  • releaseMemory(size: Long)
  • freeSpaceToShrinkPool(spaceToFree: Long)

MemoryManager interface

  • conf: SparkConf
  • numCores: CPU cores
  • onHeapStorageMemory
  • onHeapExecutionMemory
  • onHeapStorageMemoryPool
  • offHeapStorageMemoryPool
  • offHeapStorageMemoryPool
  • offHeapExecutionMemoryPool
  • maxOffHeapMemory
  • offHeapStorageMemory

Methods:

  • maxOnHeapStorageMemory
  • maxOffHeapStorageMemory
  • setMemoryStore
  • acquireStorageMemory
  • acquireUnrollMemory
  • releaseStorageMemory
  • releaseAllStorageMemory
  • releaseUnrollMemory
  • storageMemoryUsed

UnifiedMemoryManager

Unified memory management mechanism.

  • conf: SparkConf
  • maxHeapMemory: max memory on heap
  • onHeapStorageRegionSize.
  • numCores

MemoryStore

Responsible for storing Block in memory.

  • entries: mapping between BlockId and MemoryEntry
  • onHeapUnrollMemoryMap
  • offHeapUnrollMemoryMap
  • unrollMemoryThreshold

Some explanation in this post is good.

Methods:

  • getSize(blockId: BlockId): Long
  • putByte(blockId, size, memoryMode, bytes): Boolean
  • reserveUnrollMemoryForThisTask
  • releaseUnrollMemoryForThisTask
  • putIteratorAsValues
  • putIteratorAsBytes
  • getBytes(blockId)
  • getValues(blockId)
  • remove
  • clear
  • evictBlocksToFreeSpace
  • contains(blockId)

BlockManager

This is the CORE. Methods:

  • reregister: register BlockManager to BlockManagerMaster and report Block information to BlockManagerMaster.
  • getLocalBytes(blockId: BlockId) -> ChunkedByteBuffer
  • getBlockData(blockId: BlockId) -> ManagedBuffer: get data from local Block.
  • putBytes
  • putBlockData
  • getMatchingBlockIds
  • getLocalValues
  • getRemoteBytes(blockId: BlockId) : read Block data from remote.
  • get(blockId: blockId): read local first, if not exist, read from remote.
  • downgradeLock
  • releaseLock
  • registerTask(taskAttemptId: Long) register task attempt to BlockInfoManager.
  • releaseAllLocksForTask
  • getOrElseUpdate
  • putIterator
  • getDiskWriter
  • getSingle : get Block made of single object
  • putSingle
  • dropFromMemory: delete block from memory, if memory allows writing to disk, write to disk, otherwise delete.
  • removeBlock
  • removeRdd(rddId: Int) remove all Block related a RDD.
  • removeBroadCast(broadcastId: Long, tellMaster: Boolean) remove all blocks related a Broadcast.

Management of BlockManager from BlockManagerMaster

All communications between Driver and Executor on BlockManager depends on BlockManagerMaster.

BlockManagerMaster:

  • RemoveExecutor
  • RegisterBlockManager
  • UpdateBlockInfo
  • GetLocations
  • getLocationsMultipleBlockIds
  • GetPeers
  • GetExecutorEndpointRef
  • RemoveBlock
  • RemoveRdd
  • RemoveShuffle
  • RemoveBroadCast
  • GetMemoryStatus
  • GetStorageStatus
  • GetBlockStatus
  • GetMatchingBlockIds
  • HasCachedBlocks
  • StopBlockManagerMaster

Block Transfer Service

BlockManager uses NettyBlockTransferService to transfer blocks.

To be continued..

Chapter 7 Orchestration System

Overview

There are two levels of orchestration. YARN/Mesos or standalone mode Spark belongs to level 1. This chapter focuses on level 2 -> DAGScheduler & TaskScheduler.

Scheduling process:

  1. Build operator DAG
  2. split graph into stages of tasks
  3. launch tasks via cluster manager
  4. execute tasks

What is RDD

Why RDD?

  1. Data Processing Model

RDD is a resilient parallel data structure. RDD provides a list of operators to transform RDDs.

2. Categorize Dependencies

Divide NarrowDependency & ShuffleDependency.

3. Performance

RDD allows parallel execution on multiple nodes.

4. Disaster Recovery

RDD is an immutable dateset and a task failure could be recovered from checkpoints.

RDD Dependencies

  1. Narrow Dependency

RDD has a 1–1 mapping with its upstream RDD. NarrowDependency has two subclasses:

  • OneToOneDependency

Child RDD has the same partition as parent RDD.

  • RangeDependency

Range partition has a 1–1 mapping with parent dependency. Index of PartitionId in child RDD has a partitionId — outStart+inStart partition in parent RDD.

2. ShuffleDependency

There is no 1–1 mapping between Child RDD & Parent RDD or Child RDD depends on multiple RDD from upstream RDD.

Partitioner

Partitioner decides how each upstream RDD passes its output to downstream RDDs.

Stages

DAGScheduler puts RDD into different Stage and form dependency graph between them. Stages without dependencies could run in parallel.

ResultStage

ResultStage is the last stage that clean up stuff.

ShuffleMapStage

ShuffleMapStage include one to multiple ShuffleMapTask

DAGScheduler

DAGScheduler split RDD into different stages through computation. All components talk to DAGScheduler through sending DAGSchedulerEvent.

Job submission

  1. Submit the job

DAGScheduler has a

def runJob(T, U) (…) : Unit 

method to submit the job.

2. Process job submission

DAGSchedulerEventProcessLoop invokes DAGScheduler’s handleJobSubmitted method after receiving JobSubmitted event.

Workflow

  1. Spark composes RDD DAG and call DAGScheduler’s runJob method.
  2. DAGScheduler send JobSubmitted event and DAGSchedulerEventProcessLoop queues JobSubmitted event.
  3. DAGSchedulerEventProcessLoop fetches DAGSchedulerEvent and invoke doOnReceive method to process the event.
  4. DAGSchedulerEventProcessLoop’s doOnReceive invokes DAGScheduler’s handleJobSubmitted method to process events.
  5. DAGScheduler passes most upstream stages’ tasks to TaskScheduler and gradually submit downstream tasks to TaskScheduler.

Scheduling Pool

All TaskSet are dumped into a scheduling pool and the pool will schedule each TaskSet with some scheduling algorithm.

Algorithms

  1. FIFOSchedulingAlgorithm: compare priority & stageId.
  2. FairSchedulingAlgorithm:

Pool Impl

Pool is similar to a priority queue.

TaskSetManager

TaskSetManager manages TaskSet and allocate resources to Tasks. TaskSchedulerImpl depends on TaskSetManager.

LaunchBackend

When Spark application is not running inside user application but in an independent process, user could communicate with Spark app through LauncherServer.

BackendConnection

BackendConnection is an internal component of BackendConnection and connecting to LauncherServer’socket to send & receive messages. It has the following methods:

  • handle: process messages from LauncherServer
  • run: read messages from client socket.
  • send: talk to LauncherServer
  • close: close the socket connection with LauncherServer.

LauncherBackend

LauncherBackedn is the communication component betwen SchedulerBackend and LauncherServer.

SchedulerBackend

TaskScheduler allocate resources to Task through SchedulerBackend.

TaskResultGetter

TaskResultGetter deserializes serialized Task execution result.

TaskScheduler

This interface defines how to schedule tasks.

Chapter 8 Computing Engine

Overview

Spark’s computing engine includes execution memory and Shuffle.

  • Execution Memory

Include execution memory, TaskMemoryManager, MemoryConsumer etc.. Each Task has its own TaskMemoryManager. MemoryManager manages OS memory and JVM heap memory.

  • Shuffle

In early version of Spark shuffle, map task would

  1. create a bucket for each reduce and this results in M x R buckets.
  2. map task write intermediate result to bucket based on partition.
  3. reduce task read blocks from either local or remote map’s BlockManager.

Problem of this solution is:

  1. map task writes intermediate result to memory and then disk. This has a huge consumption in memory usage.
  2. too many buckets and results in frequent I/O.

Solution to this:

  1. Merge partition output (buckets) for each reduce task to the same file to reduce shuffle frequency.
  2. Map task outputs result one by one and use AppendOnlyMap cache to aggregate intermediate data. Greatly reduced memory consumption due to intermediate results.
  3. Reduce task read intermediate result from Map task record by record instead of reading everything at once to reduce memory consumption.
  4. Reduce task aggregate required Block base on BlockManager’s address and reduce network I/O.

MemoryManager

ExecutionMemoryPool

Inherits MemoryPool, this is logical memory pool, not heap or off heap memory. Some methods below:

  • acquireMemory: allocate numBytes memory for taskAttemptId.
  • releaseMemory: release numBytes memory for taskAttemptId.
  • releaseAllMemoryForTask: release all memory for taskAttemptId.

MemoryManager (interface)

MemoryManager’s methods:

  • acquireExecutionMemory
  • releaseExecutionMemory
  • releaseAlExecutionMemoryForTask
  • executionMemoryUsed
  • getExecutionMemoryUsageForTask.

UnifiedMemoryManager (An impl of MemoryManager)

MemoryManager & Tungsten

  • MemoryBlock

Tungsten implements a data structure similar to OS memory Page. It is a logical representation of either heap memory or off heap memory.

  • MemoryAllocator

This has two types of memory allocation: HeapMemoryAllocator and UnsafeMemoryAllocator.

TaskMemoryManager

TaskMemoryManager manages memory allocation and release for individual task attempt. This depends on MemoryManager.

Task

TaskContext

TaskContext maintains the context when a Task is being executed. TaskContextImpl is the only implementation of TaskContext:

  • stageId
  • partitionId
  • taskAttemptId
  • attempId
  • taskMemoryManager
  • metricsSystem
  • taskMetrics
  • onCompleteCallbacks
  • onFailureCallbacks
  • interrupted
  • completed
  • failed
  • addTaskCompletionListener
  • addTaskFailureListener
  • maskTaskFailed
  • markInterrupted
  • getMetricsSources
  • registerAccumulator

Task is the smallest unit in a Spark job. ShuffleMapTask is similar to MapTask in Hadoop and ResultTask is similar to ResultTask in Hadoop.

IndexShuffleBlockResolver

ShuffleBlockResolver defines how to deserialize Shuffle Blocks, including getting Shuffle data file, get Shuffle Index file, delete Shuffle files, get Shuffle Blocks etc.

Sampling & Estimate

In shuffle stage of Spark job, Spark added cache and other data structure. These data structures requires estimate of sizes to avoid spilling to disk. SizeTracker defines how to sample and estimate.

  • SAMPLE_GROWTH_RATE: growth rate of sampling
  • samples: list of samples
  • bytesPerUpdate
  • numUpdates
  • nextSampleNum

WritablePartitionedPairCollection

WritablePartitionedPairCollection is a collection to track key and partitionId.

AppendOnlyMap

AppendOnlyMap could cache null and supports up to 0.7 * 2 ^ 29 elements.

  • initialCapacity: default to 64.
  • mask: mask to store data location.
  • capacity: capacity of current data array.
  • data: store array of key and merged result.
  • LOAD_FACTOR: array growth index. 0.7
  • growThreshold
  • curSize: number of items in data
  • havaNullValue: if data array has null value.
  • nullValue
  • destroyed: if data array could be reused.
  • destructionMessage.

PartitionedPairBuffer

Cache key value pair in memory & sort. It inherits both WritablePartitionedPairCollection and SizeTracker.

ExternalSorter

ExternalSorter is used to sort output data at either map or reduce task. Spark has two external sorters — ExternalSorter and ShuffleExternalSorter. ExternalSorter save map task’s output JVM heap and merge the data if function specified. ShuffleExternalSorter is for Shuffle data sorting. It doesn’t implement persist feature and it uses UnsafeShuffleWriter to persist data.

ShuffleManager

ShuffleManager manages Shuffle operations. ShuffleManager only has one SortShuffleManager implementation so far.

ShuffleWriter

SortShuffleManager depdends on ShuffleWriter. ShuffleWriter has SortShuffleWriter, UnsafeShuffleWriter and BypassMergeSortShuffleWriter 3 subclasses.

  • SortShuffleWriter: use ExternalSorter as sorter and it supports merging of shuffle data. Its key functionality is to write map task’s output to disk. When mapSideCombine property is set to false, key value pairs won’t be aggregated.
  • BypassMergeSortShuffleWriter: ShuffleWriter w/o sort & merge.
  • UnsafeShuffleWriter: uses ShuffleExternalSorter as external sorter thus it ht can’t merge. UnsafeShuffleWriter uses Tungsten memory to improve disk IO.
  • ShuffleBlockFetcherIterator: an iterator to get multiple blocks. If Block is local, fetch from local BlockManager, otherwise use ShuffleClient to request Block from BlockTransferService.
  • BlockStoreShuffleReader: used for read data from startPartition to endPartition from other nodes’ Block data.

Combination of Shuffle at Map & Reduce

Merge in both map & reduce

  • ShuffleDependency has mapSideCombine = true
  • merge function is specified
  • ShuffleDependency doesn’t allow serialization.

Cache in map and merge in reduce.

  • ShuffleDependency has mapSideCombine = false
  • ShuffleDependency’s partition number is larger than spark.shuffle.sort.bypassMergeThreshold.
  • ShuffleDependency doesn’t support serialization.
  • merge function specified.

Cache in map and no merge in reduce

  • ShuffleDependency has mapSideCombine = false
  • ShuffleDependency’s partition number is larger than spark.shuffle.sort.bypassMergeThreshold.
  • ShuffleDependency doesn’t support serialization.
  • merge function not specified.

Bypass sort & merge in map and no merge in reduce

  • ShuffleDependency has mapSideCombine = false
  • ShuffleDependency’s partition number is smaller than spark.shuffle.sort.bypassMergeThreshold.
  • merge function not specified.

Chapter 9 Deployment

Spark supports the following types of deployments:

  • local: debugging
  • local-cluster: debugging
  • standalone: spark://host:port. HA + distributed, Prod OK
  • 3rd party deployment: YARN / Mesos / Zk / Simr

HeartbeatReceiver

HeartbeatReceiver runs in Driver and receives Heartbeat from Executors. It implemented the following methods:

  • addExecutor(executorId: String): after SparkListenerExecutorAdded event received.
  • onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved)
  • expireDeadHosts() : check expired hosts.
  • Heartbeat: return HeartbeatResponse to the executors who sent heartbeat to HeartbeatReceiver.

Executor Implementation

Heartbeat, execute task.

Local Deployment

Local deployment only has Driver, no Master or Worker. Executor and Driver are in the same JVM process.

PersistenceEngine

PersistenceEngine selects other Master via election to when Master failed. PersistenceEngine defines how any state information of Master is serialized.

  • FileSystemPersistenceEngine: FS based persistence engine.
  • ZooKeeperPersistenceEngine: Zooper based persistence engine.

Leader Election

Make sure only one Master is in Active mode.

Local Deployment, Local Cluster Deployment & 3rd Party Deployment

Chapter 10 Spark API

DataSource

DataSource is responsible for managing search, creation and analyze relationships between different data sources.

Checkpoints

Checkpoints is used for disaster recovery. It is responsible for persist data structures in memory to disk. Spark uses checkpoints primarily for storing RDD execution state and avoid recalculating them.

RDD

Transform API

  • mapPartitions: transform RDD into MapPartitionsRDD.
  • mapPartitionsWithIndex: create a function related to partition index and apply to MapPartitionsRDD in every partition.
  • mapPartitionsWithIndexInternal: create a MapPartitionsRDD that applies a function to every partition.
  • flatMap
  • map
  • toJavaRDD

Action API

Transform API won’t execute the actual operations, action APIs will.

  • collect: will invoke runJob in SparkContext
  • foreach: will invoke runJob in SparkContext
  • reduce: will invoke runJob in SparkContext

Checkpoint API

User must explicitly call checkpoint() to enable checkpoint.

Dataset

Strong typed collection.

DataFrameReader

DataFrameReader transforms different external data sources into DataFrame or Dataset.

SparkSession

Centralized entry point for Spark operations.

  • sparkContext
  • sharedState
  • sessionState
  • sqlContext
  • conf

Thoughts

Chapter 1–5 have the best ROI, they are short but provided a fairly good overview of Spark. The rest 6–10 are much more detailed, all the way to code level, this might be useful for daily Spark core developers but not for audience who just wants to get a better understanding Spark’s architecture.

--

--

Oliver Hu
Oliver Hu

No responses yet