Reading Notes Chapter 6–10
Spark Version 2.1; Scala version 2.11
Chapter 6 Storage System
Overview
Spark storage system is composed of BlockManager s in Driver and Executor instances. BlockManager depends on the following components:
- BlockManagerMaster: BlockManager’s delegate to talk to the BlockManagerMasterEndpoint in Driver.
- BlockManagerMasterEndpoint: created by SparkEnv and registered in Driver’s RpcEnv. This only exists in Driver’s SparkEnv.
- BlockManagerSlaveEndpoint: exists in both Executor & Driver. Receive commands from BlockManagerMasterEndpoint.
- SerializerManager.
- MemoryManager: responsible for allocation and recycling of memory in single node.
- MapOutputTracker: track map output.
- ShuffleManager: manage Shuffle.
- BlockTransferService: serve block transfer.
- ShuffleClient: client of Shuffle.
- SecurityManager, DiskBlockerManager, BlockInfoManager, MemoryStore, DiskStore.
Concepts
BlockManagerId
Unique for each BlockManager. It contains:
- host_, port_, executorId_, topologyInfo_, executorId, hostPort, host, port, topologyInfo, isDriver
- writeExternal: serialize BlockerManagerId and write to external.
- readExternal: real external BlockManagerId.
BlockId
Unique for each Block, it contains:
- name: unique global name
- isRDD: if the BlockId is RDDBlockId
- asRDDId: convert BlockId into RDDBlockId.
- isShuffle: if current BlockId is ShuffleBlockId.
- isBroadCast: if current BlockId is BroadcastBlockId.
StorageLevel
- useDisk: if block can be written to disk
- useMemory: if can be written to memory
- useOffHeap: if use offHeap.
- deserialized: if needed deserialization.
- replication.
- memoryMode: ON_HEAP | OFF_HEAP
- clone: make a clone
- isValid: if is valid.
BlockInfo
Metadata of a block.
- level: StorageLevel level
- classTag: Block type.
- tellMaster: if this block should be announced to master
- size: size of block
- readerCount: fixed reading count.
- writeTask: store task attempt id.
BlockResult
- storageLevel: level in StorageLevel
- memSize: size in memory
- diskSize: size in disk
- isCached: if in storage system.
BlockInfoManager
BlockInfoManager primarily manages the locking resources of Block.
Block Lock
BlockInfoManager uses share lock (read) and mutual exclusive lock (write).
- infos: mapping between BlockId and BlockInfo
- writeLocksByTask: mapping between TaskAttemptId and Block.
- readLocksByTask: mapping between TaskAttemptId and Block.
DiskBlockManager
DiskBlockManager is responsible for managing the Block and its location in disk.
- conf: SparkConf
- deleteFilesOnStop: if delete local directory on stopping DiskBlockManager.
- localDirs: array of local directories.
Structure:
------
LocalDir [Blockmgr-UUID-0]
------
| |SubDir [0] [1]------- |
Blocks [Block A]
DiskStore
DiskStore is responsible for writing Blocks to disk.
- conf: SparkConf
- diskManager: DiskBlockManager
- minMemoryMapBytes: read directly or use FileChannel.
MemoryManager
A key difference between Hadoop & Spark is Spark’s usage of memory.
MemoryPool
Spark uses both on heap memory and off heap memory.
- lock: lock instance
- poolSize: size of the pool (in bytes)
- memoryUsed: used memory.
- memoryFree: available memory
- incrementPoolSize: increase size of memory pool
- decrementPoolSize: decrease size of memory pool
StorageMemoryPool
An impl of MemoryPool
- acquireMemory(blockId: BlockId, numBytes: Long) get numBytes for a the Block in BlockId.
- releaseMemory(size: Long)
- freeSpaceToShrinkPool(spaceToFree: Long)
MemoryManager interface
- conf: SparkConf
- numCores: CPU cores
- onHeapStorageMemory
- onHeapExecutionMemory
- onHeapStorageMemoryPool
- offHeapStorageMemoryPool
- offHeapStorageMemoryPool
- offHeapExecutionMemoryPool
- maxOffHeapMemory
- offHeapStorageMemory
Methods:
- maxOnHeapStorageMemory
- maxOffHeapStorageMemory
- setMemoryStore
- acquireStorageMemory
- acquireUnrollMemory
- releaseStorageMemory
- releaseAllStorageMemory
- releaseUnrollMemory
- storageMemoryUsed
UnifiedMemoryManager
Unified memory management mechanism.
- conf: SparkConf
- maxHeapMemory: max memory on heap
- onHeapStorageRegionSize.
- numCores
MemoryStore
Responsible for storing Block in memory.
- entries: mapping between BlockId and MemoryEntry
- onHeapUnrollMemoryMap
- offHeapUnrollMemoryMap
- unrollMemoryThreshold
Some explanation in this post is good.
Methods:
- getSize(blockId: BlockId): Long
- putByte(blockId, size, memoryMode, bytes): Boolean
- reserveUnrollMemoryForThisTask
- releaseUnrollMemoryForThisTask
- putIteratorAsValues
- putIteratorAsBytes
- getBytes(blockId)
- getValues(blockId)
- remove
- clear
- evictBlocksToFreeSpace
- contains(blockId)
BlockManager
This is the CORE. Methods:
- reregister: register BlockManager to BlockManagerMaster and report Block information to BlockManagerMaster.
- getLocalBytes(blockId: BlockId) -> ChunkedByteBuffer
- getBlockData(blockId: BlockId) -> ManagedBuffer: get data from local Block.
- putBytes
- putBlockData
- getMatchingBlockIds
- getLocalValues
- getRemoteBytes(blockId: BlockId) : read Block data from remote.
- get(blockId: blockId): read local first, if not exist, read from remote.
- downgradeLock
- releaseLock
- registerTask(taskAttemptId: Long) register task attempt to BlockInfoManager.
- releaseAllLocksForTask
- getOrElseUpdate
- putIterator
- getDiskWriter
- getSingle : get Block made of single object
- putSingle
- dropFromMemory: delete block from memory, if memory allows writing to disk, write to disk, otherwise delete.
- removeBlock
- removeRdd(rddId: Int) remove all Block related a RDD.
- removeBroadCast(broadcastId: Long, tellMaster: Boolean) remove all blocks related a Broadcast.
Management of BlockManager from BlockManagerMaster
All communications between Driver and Executor on BlockManager depends on BlockManagerMaster.
BlockManagerMaster:
- RemoveExecutor
- RegisterBlockManager
- UpdateBlockInfo
- GetLocations
- getLocationsMultipleBlockIds
- GetPeers
- GetExecutorEndpointRef
- RemoveBlock
- RemoveRdd
- RemoveShuffle
- RemoveBroadCast
- GetMemoryStatus
- GetStorageStatus
- GetBlockStatus
- GetMatchingBlockIds
- HasCachedBlocks
- StopBlockManagerMaster
Block Transfer Service
BlockManager uses NettyBlockTransferService to transfer blocks.
To be continued..
Chapter 7 Orchestration System
Overview
There are two levels of orchestration. YARN/Mesos or standalone mode Spark belongs to level 1. This chapter focuses on level 2 -> DAGScheduler & TaskScheduler.
Scheduling process:
- Build operator DAG
- split graph into stages of tasks
- launch tasks via cluster manager
- execute tasks
What is RDD
Why RDD?
- Data Processing Model
RDD is a resilient parallel data structure. RDD provides a list of operators to transform RDDs.
2. Categorize Dependencies
Divide NarrowDependency & ShuffleDependency.
3. Performance
RDD allows parallel execution on multiple nodes.
4. Disaster Recovery
RDD is an immutable dateset and a task failure could be recovered from checkpoints.
RDD Dependencies
- Narrow Dependency
RDD has a 1–1 mapping with its upstream RDD. NarrowDependency has two subclasses:
- OneToOneDependency
Child RDD has the same partition as parent RDD.
- RangeDependency
Range partition has a 1–1 mapping with parent dependency. Index of PartitionId in child RDD has a partitionId — outStart+inStart partition in parent RDD.
2. ShuffleDependency
There is no 1–1 mapping between Child RDD & Parent RDD or Child RDD depends on multiple RDD from upstream RDD.
Partitioner
Partitioner decides how each upstream RDD passes its output to downstream RDDs.
Stages
DAGScheduler puts RDD into different Stage and form dependency graph between them. Stages without dependencies could run in parallel.
ResultStage
ResultStage is the last stage that clean up stuff.
ShuffleMapStage
ShuffleMapStage include one to multiple ShuffleMapTask
DAGScheduler
DAGScheduler split RDD into different stages through computation. All components talk to DAGScheduler through sending DAGSchedulerEvent.
Job submission
- Submit the job
DAGScheduler has a
def runJob(T, U) (…) : Unit
method to submit the job.
2. Process job submission
DAGSchedulerEventProcessLoop invokes DAGScheduler’s handleJobSubmitted method after receiving JobSubmitted event.
Workflow
- Spark composes RDD DAG and call DAGScheduler’s runJob method.
- DAGScheduler send JobSubmitted event and DAGSchedulerEventProcessLoop queues JobSubmitted event.
- DAGSchedulerEventProcessLoop fetches DAGSchedulerEvent and invoke doOnReceive method to process the event.
- DAGSchedulerEventProcessLoop’s doOnReceive invokes DAGScheduler’s handleJobSubmitted method to process events.
- DAGScheduler passes most upstream stages’ tasks to TaskScheduler and gradually submit downstream tasks to TaskScheduler.
Scheduling Pool
All TaskSet are dumped into a scheduling pool and the pool will schedule each TaskSet with some scheduling algorithm.
Algorithms
- FIFOSchedulingAlgorithm: compare priority & stageId.
- FairSchedulingAlgorithm:
Pool Impl
Pool is similar to a priority queue.
TaskSetManager
TaskSetManager manages TaskSet and allocate resources to Tasks. TaskSchedulerImpl depends on TaskSetManager.
LaunchBackend
When Spark application is not running inside user application but in an independent process, user could communicate with Spark app through LauncherServer.
BackendConnection
BackendConnection is an internal component of BackendConnection and connecting to LauncherServer’socket to send & receive messages. It has the following methods:
- handle: process messages from LauncherServer
- run: read messages from client socket.
- send: talk to LauncherServer
- close: close the socket connection with LauncherServer.
LauncherBackend
LauncherBackedn is the communication component betwen SchedulerBackend and LauncherServer.
SchedulerBackend
TaskScheduler allocate resources to Task through SchedulerBackend.
TaskResultGetter
TaskResultGetter deserializes serialized Task execution result.
TaskScheduler
This interface defines how to schedule tasks.
Chapter 8 Computing Engine
Overview
Spark’s computing engine includes execution memory and Shuffle.
- Execution Memory
Include execution memory, TaskMemoryManager, MemoryConsumer etc.. Each Task has its own TaskMemoryManager. MemoryManager manages OS memory and JVM heap memory.
- Shuffle
In early version of Spark shuffle, map task would
- create a bucket for each reduce and this results in M x R buckets.
- map task write intermediate result to bucket based on partition.
- reduce task read blocks from either local or remote map’s BlockManager.
Problem of this solution is:
- map task writes intermediate result to memory and then disk. This has a huge consumption in memory usage.
- too many buckets and results in frequent I/O.
Solution to this:
- Merge partition output (buckets) for each reduce task to the same file to reduce shuffle frequency.
- Map task outputs result one by one and use AppendOnlyMap cache to aggregate intermediate data. Greatly reduced memory consumption due to intermediate results.
- Reduce task read intermediate result from Map task record by record instead of reading everything at once to reduce memory consumption.
- Reduce task aggregate required Block base on BlockManager’s address and reduce network I/O.
MemoryManager
ExecutionMemoryPool
Inherits MemoryPool, this is logical memory pool, not heap or off heap memory. Some methods below:
- acquireMemory: allocate numBytes memory for taskAttemptId.
- releaseMemory: release numBytes memory for taskAttemptId.
- releaseAllMemoryForTask: release all memory for taskAttemptId.
MemoryManager (interface)
MemoryManager’s methods:
- acquireExecutionMemory
- releaseExecutionMemory
- releaseAlExecutionMemoryForTask
- executionMemoryUsed
- getExecutionMemoryUsageForTask.
UnifiedMemoryManager (An impl of MemoryManager)
MemoryManager & Tungsten
- MemoryBlock
Tungsten implements a data structure similar to OS memory Page. It is a logical representation of either heap memory or off heap memory.
- MemoryAllocator
This has two types of memory allocation: HeapMemoryAllocator and UnsafeMemoryAllocator.
TaskMemoryManager
TaskMemoryManager manages memory allocation and release for individual task attempt. This depends on MemoryManager.
Task
TaskContext
TaskContext maintains the context when a Task is being executed. TaskContextImpl is the only implementation of TaskContext:
- stageId
- partitionId
- taskAttemptId
- attempId
- taskMemoryManager
- metricsSystem
- taskMetrics
- onCompleteCallbacks
- onFailureCallbacks
- interrupted
- completed
- failed
- addTaskCompletionListener
- addTaskFailureListener
- maskTaskFailed
- markInterrupted
- getMetricsSources
- registerAccumulator
Task is the smallest unit in a Spark job. ShuffleMapTask is similar to MapTask in Hadoop and ResultTask is similar to ResultTask in Hadoop.
IndexShuffleBlockResolver
ShuffleBlockResolver defines how to deserialize Shuffle Blocks, including getting Shuffle data file, get Shuffle Index file, delete Shuffle files, get Shuffle Blocks etc.
Sampling & Estimate
In shuffle stage of Spark job, Spark added cache and other data structure. These data structures requires estimate of sizes to avoid spilling to disk. SizeTracker defines how to sample and estimate.
- SAMPLE_GROWTH_RATE: growth rate of sampling
- samples: list of samples
- bytesPerUpdate
- numUpdates
- nextSampleNum
WritablePartitionedPairCollection
WritablePartitionedPairCollection is a collection to track key and partitionId.
AppendOnlyMap
AppendOnlyMap could cache null and supports up to 0.7 * 2 ^ 29 elements.
- initialCapacity: default to 64.
- mask: mask to store data location.
- capacity: capacity of current data array.
- data: store array of key and merged result.
- LOAD_FACTOR: array growth index. 0.7
- growThreshold
- curSize: number of items in data
- havaNullValue: if data array has null value.
- nullValue
- destroyed: if data array could be reused.
- destructionMessage.
PartitionedPairBuffer
Cache key value pair in memory & sort. It inherits both WritablePartitionedPairCollection and SizeTracker.
ExternalSorter
ExternalSorter is used to sort output data at either map or reduce task. Spark has two external sorters — ExternalSorter and ShuffleExternalSorter. ExternalSorter save map task’s output JVM heap and merge the data if function specified. ShuffleExternalSorter is for Shuffle data sorting. It doesn’t implement persist feature and it uses UnsafeShuffleWriter to persist data.
ShuffleManager
ShuffleManager manages Shuffle operations. ShuffleManager only has one SortShuffleManager implementation so far.
ShuffleWriter
SortShuffleManager depdends on ShuffleWriter. ShuffleWriter has SortShuffleWriter, UnsafeShuffleWriter and BypassMergeSortShuffleWriter 3 subclasses.
- SortShuffleWriter: use ExternalSorter as sorter and it supports merging of shuffle data. Its key functionality is to write map task’s output to disk. When mapSideCombine property is set to false, key value pairs won’t be aggregated.
- BypassMergeSortShuffleWriter: ShuffleWriter w/o sort & merge.
- UnsafeShuffleWriter: uses ShuffleExternalSorter as external sorter thus it ht can’t merge. UnsafeShuffleWriter uses Tungsten memory to improve disk IO.
- ShuffleBlockFetcherIterator: an iterator to get multiple blocks. If Block is local, fetch from local BlockManager, otherwise use ShuffleClient to request Block from BlockTransferService.
- BlockStoreShuffleReader: used for read data from startPartition to endPartition from other nodes’ Block data.
Combination of Shuffle at Map & Reduce
Merge in both map & reduce
- ShuffleDependency has mapSideCombine = true
- merge function is specified
- ShuffleDependency doesn’t allow serialization.
Cache in map and merge in reduce.
- ShuffleDependency has mapSideCombine = false
- ShuffleDependency’s partition number is larger than spark.shuffle.sort.bypassMergeThreshold.
- ShuffleDependency doesn’t support serialization.
- merge function specified.
Cache in map and no merge in reduce
- ShuffleDependency has mapSideCombine = false
- ShuffleDependency’s partition number is larger than spark.shuffle.sort.bypassMergeThreshold.
- ShuffleDependency doesn’t support serialization.
- merge function not specified.
Bypass sort & merge in map and no merge in reduce
- ShuffleDependency has mapSideCombine = false
- ShuffleDependency’s partition number is smaller than spark.shuffle.sort.bypassMergeThreshold.
- merge function not specified.
Chapter 9 Deployment
Spark supports the following types of deployments:
- local: debugging
- local-cluster: debugging
- standalone: spark://host:port. HA + distributed, Prod OK
- 3rd party deployment: YARN / Mesos / Zk / Simr
HeartbeatReceiver
HeartbeatReceiver runs in Driver and receives Heartbeat from Executors. It implemented the following methods:
- addExecutor(executorId: String): after SparkListenerExecutorAdded event received.
- onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved)
- expireDeadHosts() : check expired hosts.
- Heartbeat: return HeartbeatResponse to the executors who sent heartbeat to HeartbeatReceiver.
Executor Implementation
Heartbeat, execute task.
Local Deployment
Local deployment only has Driver, no Master or Worker. Executor and Driver are in the same JVM process.
PersistenceEngine
PersistenceEngine selects other Master via election to when Master failed. PersistenceEngine defines how any state information of Master is serialized.
- FileSystemPersistenceEngine: FS based persistence engine.
- ZooKeeperPersistenceEngine: Zooper based persistence engine.
Leader Election
Make sure only one Master is in Active mode.
Local Deployment, Local Cluster Deployment & 3rd Party Deployment
Chapter 10 Spark API
DataSource
DataSource is responsible for managing search, creation and analyze relationships between different data sources.
Checkpoints
Checkpoints is used for disaster recovery. It is responsible for persist data structures in memory to disk. Spark uses checkpoints primarily for storing RDD execution state and avoid recalculating them.
RDD
Transform API
- mapPartitions: transform RDD into MapPartitionsRDD.
- mapPartitionsWithIndex: create a function related to partition index and apply to MapPartitionsRDD in every partition.
- mapPartitionsWithIndexInternal: create a MapPartitionsRDD that applies a function to every partition.
- flatMap
- map
- toJavaRDD
Action API
Transform API won’t execute the actual operations, action APIs will.
- collect: will invoke runJob in SparkContext
- foreach: will invoke runJob in SparkContext
- reduce: will invoke runJob in SparkContext
Checkpoint API
User must explicitly call checkpoint() to enable checkpoint.
Dataset
Strong typed collection.
DataFrameReader
DataFrameReader transforms different external data sources into DataFrame or Dataset.
SparkSession
Centralized entry point for Spark operations.
- sparkContext
- sharedState
- sessionState
- sqlContext
- conf
Thoughts
Chapter 1–5 have the best ROI, they are short but provided a fairly good overview of Spark. The rest 6–10 are much more detailed, all the way to code level, this might be useful for daily Spark core developers but not for audience who just wants to get a better understanding Spark’s architecture.