Skip to content

Latest commit

 

History

History
1199 lines (1016 loc) · 47.2 KB

RocksDB On Flink.md

File metadata and controls

1199 lines (1016 loc) · 47.2 KB

RocksDB On Flink

  • RocksDB应用在很多成熟的OLAP引擎中包括Doris、Kudu等,在流式计算引擎中也有它的一席之地,这篇文章我们通过RocksDB在Flink中的实现来深入学习了RocksDBJava的使用姿势,以及深入了解下Flink的StateBackend。

学习前置条件

  • 了解RocksDB概念,数据结构等
  • 了解Flink Rocksdb StateBackend
  • 具有Flink源码环境

代码结构

  • Iterator:rocksdb scan迭代器基于RocksIteratorInterface实现的包装类实现的各类Iterator,包含Queue、单状态、状态Keys和Namesapce迭代器、(key-group, kv-state)迭代器。
  • restore:从状态中恢复RocksDB的Snapshot,主要包含对应的RocksDB实例、列族处理器、RocksDB指标采集器、SST文件、最后依次的ck id(具体查看RocksDBRestoreResult对象)。
    • RocksDBRestoreOperation:主要的RocksDB状态恢复操作接口,提供了增量恢复、全量快照恢复、不恢复等策略。
  • snapshot:RocksDB快照恢复工具类,包含全量、增量方式快照恢复手段。

StateBackend

  • flink提供的状态后端接口,通过实现这个接口可以自定义flink的state管理后端。

EmbeddedRocksDBStateBackend

  • 嵌入是Rocksdb状态后端通过一个嵌入的Rocksdb实例来存储state,这个状态后端可以存储非常大的state超过内存会溢写到本地磁盘。所有的key/value状态(包含窗口)将按照key/value索引存储在RocksDB中。配置CheckpointStorge可以防止因机器崩溃导致的数据丢失。

属性含义

  /** The number of (re)tries for loading the RocksDB JNI library. */
    // Rocksdb loadLib的重试次数
    private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;

    /** Flag whether the native library has been loaded. */
    // 标识rocksdb是否已经初始化,保证一个jvm内rocksdb只会初始化依次
    private static boolean rocksDbInitialized = false;
    // 默认用于传输(下载和上传)文件的线程数(每个有状态操作符)
    private static final int UNDEFINED_NUMBER_OF_TRANSFER_THREADS = -1;

    // 默认write batch的大小 -1标识模式
    private static final long UNDEFINED_WRITE_BATCH_SIZE = -1;

    // ------------------------------------------------------------------------

    // -- configuration values, set in the application / configuration

    /**
     * Base paths for RocksDB directory, as configured. Null if not yet set, in which case the
     * configuration values will be used. The configuration defaults to the TaskManager's temp
     * directories.
     * RocksDB 目录的基本路径,如配置。 如果尚未设置,则为 Null,
     * 在这种情况下,将使用配置值。 配置默认为 TaskManager 的临时目录。
     */
    @Nullable private File[] localRocksDbDirectories;

    /** The pre-configured option settings. */
    // rocksdb的预先 option配置,包含dboption和columnFamilyOption的配置
    @Nullable private PredefinedOptions predefinedOptions;

    /** The options factory to create the RocksDB options in the cluster. */
    // 集群中的rocksdb的db配置和列族配置,包含compaction、操作db线程数等
    @Nullable private RocksDBOptionsFactory rocksDbOptionsFactory;

    /** This determines if incremental checkpointing is enabled. */
    private final TernaryBoolean enableIncrementalCheckpointing;

    /** Thread number used to transfer (download and upload) state, default value: 1. */
    // rocksdb文件传输线程数
    private int numberOfTransferThreads;

    /** The configuration for memory settings (pool sizes, etc.). */
    // rocksdb内存配置包含manageMemory state、固定内存
    private final RocksDBMemoryConfiguration memoryConfiguration;

    /** This determines the type of priority queue state. */
   // 决定timer服务存储使用的实现,是ROCKSDB还是HEAP,HEAP会存在OOM
    @Nullable private EmbeddedRocksDBStateBackend.PriorityQueueStateType priorityQueueStateType;

    /** The default rocksdb metrics options. */
    private final RocksDBNativeMetricOptions defaultMetricOptions;

    // -- runtime values, set on TaskManager when initializing / using the backend

    /** Base paths for RocksDB directory, as initialized. */
    // 初始化rocksdb的默认path
    private transient File[] initializedDbBasePaths;

    /** JobID for uniquifying backup paths. */
    private transient JobID jobId;

    /** The index of the next directory to be used from {@link #initializedDbBasePaths}. */
    private transient int nextDirectory;

    /** Whether we already lazily initialized our local storage directories. */
    private transient boolean isInitialized;

    /**
     * Max consumed memory size for one batch in {@link RocksDBWriteBatchWrapper}, default value
     * 2mb.
     */
    private long writeBatchSize;

核心构造方法

  private EmbeddedRocksDBStateBackend(
            EmbeddedRocksDBStateBackend original, ReadableConfig config, ClassLoader classLoader) {
        // configure incremental checkpoints
        this.enableIncrementalCheckpointing =
                original.enableIncrementalCheckpointing.resolveUndefined(
                        config.get(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));

        if (original.numberOfTransferThreads == UNDEFINED_NUMBER_OF_TRANSFER_THREADS) {
            this.numberOfTransferThreads = config.get(CHECKPOINT_TRANSFER_THREAD_NUM);
        } else {
            this.numberOfTransferThreads = original.numberOfTransferThreads;
        }

        if (original.writeBatchSize == UNDEFINED_WRITE_BATCH_SIZE) {
            this.writeBatchSize = config.get(WRITE_BATCH_SIZE).getBytes();
        } else {
            this.writeBatchSize = original.writeBatchSize;
        }

        this.memoryConfiguration =
                RocksDBMemoryConfiguration.fromOtherAndConfiguration(
                        original.memoryConfiguration, config);
        this.memoryConfiguration.validate();

        if (null == original.priorityQueueStateType) {
            this.priorityQueueStateType = config.get(TIMER_SERVICE_FACTORY);
        } else {
            this.priorityQueueStateType = original.priorityQueueStateType;
        }

        // configure local directories
        if (original.localRocksDbDirectories != null) {
            this.localRocksDbDirectories = original.localRocksDbDirectories;
        } else {
            final String rocksdbLocalPaths = config.get(RocksDBOptions.LOCAL_DIRECTORIES);
            if (rocksdbLocalPaths != null) {
                String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);

                try {
                    setDbStoragePaths(directories);
                } catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException(
                            "Invalid configuration for RocksDB state "
                                    + "backend's local storage directories: "
                                    + e.getMessage(),
                            e);
                }
            }
        }

        // configure metric options
        this.defaultMetricOptions = RocksDBNativeMetricOptions.fromConfig(config);

        // configure RocksDB predefined options
        this.predefinedOptions =
                original.predefinedOptions == null
                        ? PredefinedOptions.valueOf(config.get(RocksDBOptions.PREDEFINED_OPTIONS))
                        : original.predefinedOptions;
        LOG.info("Using predefined options: {}.", predefinedOptions.name());

        // configure RocksDB options factory
        try {
            // 加载Rocksdb配置,可以通过自定义实现Factory然后配置自定义的Rocksdb
            rocksDbOptionsFactory =
                    configureOptionsFactory(
                            original.rocksDbOptionsFactory,
                            config.get(RocksDBOptions.OPTIONS_FACTORY),
                            config,
                            classLoader);
        } catch (DynamicCodeLoadingException e) {
            throw new FlinkRuntimeException(e);
        }

        // configure latency tracking
        latencyTrackingConfigBuilder = original.latencyTrackingConfigBuilder.configure(config);
    }

createKeyedStateBackend

  • 创建Keyed状态后端,内部包含构建RocksDB实例以及一些状态后端组件
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
            Environment env,
            JobID jobID,
            String operatorIdentifier,
            TypeSerializer<K> keySerializer,
            int numberOfKeyGroups,
            KeyGroupRange keyGroupRange,
            TaskKvStateRegistry kvStateRegistry,
            TtlTimeProvider ttlTimeProvider,
            MetricGroup metricGroup,
            @Nonnull Collection<KeyedStateHandle> stateHandles,
            CloseableRegistry cancelStreamRegistry,
            double managedMemoryFraction)
            throws IOException {

        // first, make sure that the RocksDB JNI library is loaded
        // we do this explicitly here to have better error handling
        // 获取临时目录
        String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
        // 初始化Rocksdb,加载lib、外部lib等
        ensureRocksDBIsLoaded(tempDir);

        // replace all characters that are not legal for filenames with underscore
        //解析算子名
        String fileCompatibleIdentifier = operatorIdentifier.replaceAll("[^a-zA-Z0-9\\-]", "_");

        // 初始化rocksdb local dir
        lazyInitializeForJob(env, fileCompatibleIdentifier);

        File instanceBasePath =
                new File(
                        getNextStoragePath(),
                        "job_"
                                + jobId
                                + "_op_"
                                + fileCompatibleIdentifier
                                + "_uuid_"
                                + UUID.randomUUID());

        LocalRecoveryConfig localRecoveryConfig =
                env.getTaskStateManager().createLocalRecoveryConfig();

        final OpaqueMemoryResource<RocksDBSharedResources> sharedResources =
                RocksDBOperationUtils.allocateSharedCachesIfConfigured(
                        memoryConfiguration, env.getMemoryManager(), managedMemoryFraction, LOG);
        if (sharedResources != null) {
            LOG.info("Obtained shared RocksDB cache of size {} bytes", sharedResources.getSize());
        }
        final RocksDBResourceContainer resourceContainer =
                createOptionsAndResourceContainer(sharedResources);

        ExecutionConfig executionConfig = env.getExecutionConfig();
        StreamCompressionDecorator keyGroupCompressionDecorator =
                getCompressionDecorator(executionConfig);

        LatencyTrackingStateConfig latencyTrackingStateConfig =
                latencyTrackingConfigBuilder.setMetricGroup(metricGroup).build();
        // 创建keyed状态后端
        RocksDBKeyedStateBackendBuilder<K> builder =
                new RocksDBKeyedStateBackendBuilder<>(
                                operatorIdentifier,
                                env.getUserCodeClassLoader().asClassLoader(),
                                instanceBasePath,
                                resourceContainer,
                                stateName -> resourceContainer.getColumnOptions(),
                                kvStateRegistry,
                                keySerializer,
                                numberOfKeyGroups,
                                keyGroupRange,
                                executionConfig,
                                localRecoveryConfig,
                                getPriorityQueueStateType(),
                                ttlTimeProvider,
                                latencyTrackingStateConfig,
                                metricGroup,
                                stateHandles,
                                keyGroupCompressionDecorator,
                                cancelStreamRegistry)
                        .setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled())
                        .setNumberOfTransferingThreads(getNumberOfTransferThreads())
                        .setNativeMetricOptions(
                                resourceContainer.getMemoryWatcherOptions(defaultMetricOptions))
                        .setWriteBatchSize(getWriteBatchSize());
        return builder.build();
    }

// build方法
 public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
        // 构造Rocksdb基础组件
        RocksDBWriteBatchWrapper writeBatchWrapper = null;
        ColumnFamilyHandle defaultColumnFamilyHandle = null;
        RocksDBNativeMetricMonitor nativeMetricMonitor = null;
        CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
        LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation =
                new LinkedHashMap<>();
        LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates =
                new LinkedHashMap<>();
        RocksDB db = null;
        // 状态恢复Opeartion
        RocksDBRestoreOperation restoreOperation = null;
        RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
                new RocksDbTtlCompactFiltersManager(ttlTimeProvider);

        ResourceGuard rocksDBResourceGuard = new ResourceGuard();
        RocksDBSnapshotStrategyBase<K, ?> checkpointStrategy;
        PriorityQueueSetFactory priorityQueueFactory;
        SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
        // Number of bytes required to prefix the key groups.
        int keyGroupPrefixBytes =
                CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(
                        numberOfKeyGroups);

        try {
            // Variables for snapshot strategy when incremental checkpoint is enabled
            UUID backendUID = UUID.randomUUID();
            SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>();
            long lastCompletedCheckpointId = -1L;
            if (injectedTestDB != null) {
                db = injectedTestDB;
                defaultColumnFamilyHandle = injectedDefaultColumnFamilyHandle;
                nativeMetricMonitor =
                        nativeMetricOptions.isEnabled()
                                ? new RocksDBNativeMetricMonitor(
                                        nativeMetricOptions, metricGroup, db)
                                : null;
            } else {
                // 初始化前置实例目录
                prepareDirectories();
                // 获取状态恢复操作类
                restoreOperation =
                        getRocksDBRestoreOperation(
                                keyGroupPrefixBytes,
                                cancelStreamRegistry,
                                kvStateInformation,
                                registeredPQStates,
                                ttlCompactFiltersManager);
                // 恢复状态,获取RocksDBRestoreResult,其中包含rocksdb实例、最后一次ck等
                RocksDBRestoreResult restoreResult = restoreOperation.restore();
                // 获取rocksdb实例
                db = restoreResult.getDb();
                defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle();
                nativeMetricMonitor = restoreResult.getNativeMetricMonitor();
                // 如果是增量ck获取backendUID和最后一次ck以及需要恢复的sst文件用于增量恢复
                if (restoreOperation instanceof RocksDBIncrementalRestoreOperation) {
                    backendUID = restoreResult.getBackendUID();
                    materializedSstFiles = restoreResult.getRestoredSstFiles();
                    lastCompletedCheckpointId = restoreResult.getLastCompletedCheckpointId();
                }
            }

            writeBatchWrapper =
                    new RocksDBWriteBatchWrapper(
                            db, optionsContainer.getWriteOptions(), writeBatchSize);

            // it is important that we only create the key builder after the restore, and not
            // before;
            // restore operations may reconfigure the key serializer, so accessing the key
            // serializer
            // only now we can be certain that the key serializer used in the builder is final.
            sharedRocksKeyBuilder =
                    new SerializedCompositeKeyBuilder<>(
                            keySerializerProvider.currentSchemaSerializer(),
                            keyGroupPrefixBytes,
                            32);
            // init snapshot strategy after db is assured to be initialized
            checkpointStrategy =
                    // 初始化sp和ck
                    initializeSavepointAndCheckpointStrategies(
                            cancelStreamRegistryForBackend,
                            rocksDBResourceGuard,
                            kvStateInformation,
                            registeredPQStates,
                            keyGroupPrefixBytes,
                            db,
                            backendUID,
                            materializedSstFiles,
                            lastCompletedCheckpointId);
            // init priority queue factory
            priorityQueueFactory =
                    initPriorityQueueFactory(
                            keyGroupPrefixBytes,
                            kvStateInformation,
                            db,
                            writeBatchWrapper,
                            nativeMetricMonitor);
        } catch (Throwable e) {
            // Do clean up
            List<ColumnFamilyOptions> columnFamilyOptions =
                    new ArrayList<>(kvStateInformation.values().size());
            IOUtils.closeQuietly(cancelStreamRegistryForBackend);
            IOUtils.closeQuietly(writeBatchWrapper);
            RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(
                    columnFamilyOptions, defaultColumnFamilyHandle);
            IOUtils.closeQuietly(defaultColumnFamilyHandle);
            IOUtils.closeQuietly(nativeMetricMonitor);
            for (RocksDBKeyedStateBackend.RocksDbKvStateInfo kvStateInfo :
                    kvStateInformation.values()) {
                RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(
                        columnFamilyOptions, kvStateInfo.columnFamilyHandle);
                IOUtils.closeQuietly(kvStateInfo.columnFamilyHandle);
            }
            IOUtils.closeQuietly(db);
            // it's possible that db has been initialized but later restore steps failed
            IOUtils.closeQuietly(restoreOperation);
            IOUtils.closeAllQuietly(columnFamilyOptions);
            IOUtils.closeQuietly(optionsContainer);
            ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
            kvStateInformation.clear();
            try {
                FileUtils.deleteDirectory(instanceBasePath);
            } catch (Exception ex) {
                logger.warn("Failed to delete base path for RocksDB: " + instanceBasePath, ex);
            }
            // Log and rethrow
            if (e instanceof BackendBuildingException) {
                throw (BackendBuildingException) e;
            } else {
                String errMsg = "Caught unexpected exception.";
                logger.error(errMsg, e);
                throw new BackendBuildingException(errMsg, e);
            }
        }
        InternalKeyContext<K> keyContext =
                new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups);
        logger.info("Finished building RocksDB keyed state-backend at {}.", instanceBasePath);
        return new RocksDBKeyedStateBackend<>(
                this.userCodeClassLoader,
                this.instanceBasePath,
                this.optionsContainer,
                columnFamilyOptionsFactory,
                this.kvStateRegistry,
                this.keySerializerProvider.currentSchemaSerializer(),
                this.executionConfig,
                this.ttlTimeProvider,
                latencyTrackingStateConfig,
                db,
                kvStateInformation,
                registeredPQStates,
                keyGroupPrefixBytes,
                cancelStreamRegistryForBackend,
                this.keyGroupCompressionDecorator,
                rocksDBResourceGuard,
                checkpointStrategy,
                writeBatchWrapper,
                defaultColumnFamilyHandle,
                nativeMetricMonitor,
                sharedRocksKeyBuilder,
                priorityQueueFactory,
                ttlCompactFiltersManager,
                keyContext,
                writeBatchSize);
    }

默认Flink RocksDB配置工厂

  • DefaultConfigurableOptionsFactory类包含dbOptions、ColumnFamilyOptions、WriteOptions的默认配置等。
 @Override
    public DBOptions createDBOptions(
            DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        if (isOptionConfigured(MAX_BACKGROUND_THREADS)) {
            // flush&compact thread
            currentOptions.setIncreaseParallelism(getMaxBackgroundThreads());
        }

        // 读取最大的文件数
        if (isOptionConfigured(MAX_OPEN_FILES)) {
            currentOptions.setMaxOpenFiles(getMaxOpenFiles());
        }
        
        if (isOptionConfigured(LOG_LEVEL)) {
            currentOptions.setInfoLogLevel(getLogLevel());
        }

        return currentOptions;
    }

    @Override
    public ColumnFamilyOptions createColumnOptions(
            ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        // compaction算法
        if (isOptionConfigured(COMPACTION_STYLE)) {
            currentOptions.setCompactionStyle(getCompactionStyle());
        }
        // 是否使用动态level size
        if (isOptionConfigured(USE_DYNAMIC_LEVEL_SIZE)) {
            currentOptions.setLevelCompactionDynamicLevelBytes(getUseDynamicLevelSize());
        }
        // target file基础大小
        if (isOptionConfigured(TARGET_FILE_SIZE_BASE)) {
            currentOptions.setTargetFileSizeBase(getTargetFileSizeBase());
        }
        
        if (isOptionConfigured(MAX_SIZE_LEVEL_BASE)) {
            currentOptions.setMaxBytesForLevelBase(getMaxSizeLevelBase());
        }
        // 每个列族的writer buffer大小
        if (isOptionConfigured(WRITE_BUFFER_SIZE)) {
            currentOptions.setWriteBufferSize(getWriteBufferSize());
        }

        if (isOptionConfigured(MAX_WRITE_BUFFER_NUMBER)) {
            currentOptions.setMaxWriteBufferNumber(getMaxWriteBufferNumber());
        }

        if (isOptionConfigured(MIN_WRITE_BUFFER_NUMBER_TO_MERGE)) {
            currentOptions.setMinWriteBufferNumberToMerge(getMinWriteBufferNumberToMerge());
        }

        TableFormatConfig tableFormatConfig = currentOptions.tableFormatConfig();
        // 表配置
        BlockBasedTableConfig blockBasedTableConfig;
        if (tableFormatConfig == null) {
            blockBasedTableConfig = new BlockBasedTableConfig();
        } else {
            if (tableFormatConfig instanceof PlainTableConfig) {
                // if the table format config is PlainTableConfig, we just return current
                // column-family options
                return currentOptions;
            } else {
                blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig;
            }
        }

        if (isOptionConfigured(BLOCK_SIZE)) {
            blockBasedTableConfig.setBlockSize(getBlockSize());
        }

        if (isOptionConfigured(METADATA_BLOCK_SIZE)) {
            blockBasedTableConfig.setMetadataBlockSize(getMetadataBlockSize());
        }

        if (isOptionConfigured(BLOCK_CACHE_SIZE)) {
            blockBasedTableConfig.setBlockCacheSize(getBlockCacheSize());
        }

        return currentOptions.setTableFormatConfig(blockBasedTableConfig);
    }

RocksDB在StateBackend中的实践

Iterator使用

  • Key iterator
  public <N> Stream<K> getKeys(String state, N namespace) {
        // 获取Rocksdb KV状态详情
        RocksDbKvStateInfo columnInfo = kvStateInformation.get(state);
        if (columnInfo == null
                || !(columnInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo)) {
            return Stream.empty();
        }

        RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
                (RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.metaInfo;

        final TypeSerializer<N> namespaceSerializer =
                registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
        final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
        boolean ambiguousKeyPossible =
                CompositeKeySerializationUtils.isAmbiguousKeyPossible(
                        getKeySerializer(), namespaceSerializer);
        final byte[] nameSpaceBytes;
        try {
            CompositeKeySerializationUtils.writeNameSpace(
                    namespace, namespaceSerializer, namespaceOutputView, ambiguousKeyPossible);
            nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
        } catch (IOException ex) {
            throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
        }

        // 创建RocksDB Iterator
        RocksIteratorWrapper iterator =
                RocksDBOperationUtils.getRocksIterator(
                        db, columnInfo.columnFamilyHandle, readOptions);
        // 跳到第一个key
        iterator.seekToFirst();
        // 创建rocksDB key iterator,遍历获取每个rocksdb key
        final RocksStateKeysIterator<K> iteratorWrapper =
                new RocksStateKeysIterator<>(
                        iterator,
                        state,
                        getKeySerializer(),
                        keyGroupPrefixBytes,
                        ambiguousKeyPossible,
                        nameSpaceBytes);

        Stream<K> targetStream =
                StreamSupport.stream(
                        Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED),
                        false);
        // 返回key stream
        return targetStream.onClose(iteratorWrapper::close);
    }

State操作

RocksDBListState

  • 底层核心存储在RocksDB上是一个Key Value,Value的格式利用RocksDB的MergeOperator进行appended。
  @Override
    public List<V> getInternal() {
        try {
            byte[] key = serializeCurrentKeyWithGroupAndNamespace();
            byte[] valueBytes = backend.db.get(columnFamily, key);
            // 解析value为result
            return listSerializer.deserializeList(valueBytes, elementSerializer);
        } catch (RocksDBException e) {
            throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
        }
    }

    @Override
    public void add(V value) {
        Preconditions.checkNotNull(value, "You cannot add null to a ListState.");

        try {
            // 底层存储一个大key value使用rocksdb merge操作配合默认的mergeOpeartor实现append
            backend.db.merge(
                    columnFamily,
                    writeOptions,
                    serializeCurrentKeyWithGroupAndNamespace(),
                    serializeValue(value, elementSerializer));
        } catch (Exception e) {
            throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
        }
    }

    @Override
    public void mergeNamespaces(N target, Collection<N> sources) {
        if (sources == null || sources.isEmpty()) {
            return;
        }

        try {
            // create the target full-binary-key
            // 设置当前的namespace
            setCurrentNamespace(target);
            // 序列化nm
            final byte[] targetKey = serializeCurrentKeyWithGroupAndNamespace();

            // merge the sources to the target
            // 将source的数据合并到target上
            for (N source : sources) {
                if (source != null) {
                    setCurrentNamespace(source);
                    final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();

                    byte[] valueBytes = backend.db.get(columnFamily, sourceKey);

                    if (valueBytes != null) {
                        backend.db.delete(columnFamily, writeOptions, sourceKey);
                        // 将sourceKey数据merge到targetKey上
                        backend.db.merge(columnFamily, writeOptions, targetKey, valueBytes);
                    }
                }
            }
        } catch (Exception e) {
            throw new FlinkRuntimeException("Error while merging state in RocksDB", e);
        }
    }

    @Override
    public void update(List<V> valueToStore) {
        updateInternal(valueToStore);
    }

    @Override
    public void updateInternal(List<V> values) {
        Preconditions.checkNotNull(values, "List of values to add cannot be null.");

        if (!values.isEmpty()) {
            try {
                // 使用put修改
                backend.db.put(
                        columnFamily,
                        writeOptions,
                        serializeCurrentKeyWithGroupAndNamespace(),
                        listSerializer.serializeList(values, elementSerializer));
            } catch (IOException | RocksDBException e) {
                throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
            }
        } else {
            clear();
        }
    }

    @Override
    public void addAll(List<V> values) {
        Preconditions.checkNotNull(values, "List of values to add cannot be null.");

        if (!values.isEmpty()) {
            try {
                backend.db.merge(
                        columnFamily,
                        writeOptions,
                        serializeCurrentKeyWithGroupAndNamespace(),
                        listSerializer.serializeList(values, elementSerializer));
            } catch (IOException | RocksDBException e) {
                throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
            }
        }
    }

RocksDBMapState

  • 一个key对应一个value,就是利用RocksDB底层的kv结构
    @Override
    public UV get(UK userKey) throws IOException, RocksDBException {
        // 获取key
        byte[] rawKeyBytes =
                serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
        // value
        byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);

        return (rawValueBytes == null
                ? null
                : deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
    }

    @Override
    public void put(UK userKey, UV userValue) throws IOException, RocksDBException {

        byte[] rawKeyBytes =
                serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
        byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);

        backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
    }

    @Override
    public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
        if (map == null) {
            return;
        }
        // 使用RocksDB Batch能力,保证数据一致,原子操作
        try (RocksDBWriteBatchWrapper writeBatchWrapper =
                new RocksDBWriteBatchWrapper(
                        backend.db, writeOptions, backend.getWriteBatchSize())) {
            for (Map.Entry<UK, UV> entry : map.entrySet()) {
                byte[] rawKeyBytes =
                        serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
                                entry.getKey(), userKeySerializer);
                byte[] rawValueBytes =
                        serializeValueNullSensitive(entry.getValue(), userValueSerializer);
                writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
            }
        }
    }

    @Override
    public void remove(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes =
                serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);

        backend.db.delete(columnFamily, writeOptions, rawKeyBytes);
    }

    @Override
    public boolean contains(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes =
                serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
        byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);

        return (rawValueBytes != null);
    }

    @Override
    public Iterable<Map.Entry<UK, UV>> entries() {
        return this::iterator;
    }

    @Override
    public Iterable<UK> keys() {
        final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();

        return () ->
                new RocksDBMapIterator<UK>(
                        backend.db,
                        prefixBytes,
                        userKeySerializer,
                        userValueSerializer,
                        dataInputView) {
                    @Nullable
                    @Override
                    public UK next() {
                        RocksDBMapEntry entry = nextEntry();
                        return (entry == null ? null : entry.getKey());
                    }
                };
    }

    @Override
    public Iterable<UV> values() {
        final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();

        return () ->
                new RocksDBMapIterator<UV>(
                        backend.db,
                        prefixBytes,
                        userKeySerializer,
                        userValueSerializer,
                        dataInputView) {
                    @Override
                    public UV next() {
                        RocksDBMapEntry entry = nextEntry();
                        return (entry == null ? null : entry.getValue());
                    }
                };
    }

RocksDBMapIterator

  • Rocksdb基于JDK Iterator实现的迭代器
  private abstract class RocksDBMapIterator<T> implements Iterator<T> {

        private static final int CACHE_SIZE_LIMIT = 128;

        /** The db where data resides. */
        private final RocksDB db;

        /**
         * The prefix bytes of the key being accessed. All entries under the same key have the same
         * prefix, hence we can stop iterating once coming across an entry with a different prefix.
         */
        @Nonnull private final byte[] keyPrefixBytes;

        /**
         * True if all entries have been accessed or the iterator has come across an entry with a
         * different prefix.
         */
        private boolean expired = false;

        /** A in-memory cache for the entries in the rocksdb. */
        private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();

        /**
         * The entry pointing to the current position which is last returned by calling {@link
         * #nextEntry()}.
         */
        private RocksDBMapEntry currentEntry;

        private int cacheIndex = 0;

        private final TypeSerializer<UK> keySerializer;
        private final TypeSerializer<UV> valueSerializer;
        private final DataInputDeserializer dataInputView;

        RocksDBMapIterator(
                final RocksDB db,
                final byte[] keyPrefixBytes,
                final TypeSerializer<UK> keySerializer,
                final TypeSerializer<UV> valueSerializer,
                DataInputDeserializer dataInputView) {

            this.db = db;
            this.keyPrefixBytes = keyPrefixBytes;
            this.keySerializer = keySerializer;
            this.valueSerializer = valueSerializer;
            this.dataInputView = dataInputView;
        }

        @Override
        public boolean hasNext() {
            // 加载cache
            loadCache();

            return (cacheIndex < cacheEntries.size());
        }

        @Override
        public void remove() {
            if (currentEntry == null || currentEntry.deleted) {
                throw new IllegalStateException(
                        "The remove operation must be called after a valid next operation.");
            }

            currentEntry.remove();
        }

        final RocksDBMapEntry nextEntry() {
            loadCache();

            if (cacheIndex == cacheEntries.size()) {
                if (!expired) {
                    throw new IllegalStateException();
                }

                return null;
            }
            // 移动指正
            this.currentEntry = cacheEntries.get(cacheIndex);
            cacheIndex++;

            return currentEntry;
        }

        private void loadCache() {
            if (cacheIndex > cacheEntries.size()) {
                throw new IllegalStateException();
            }

            // Load cache entries only when the cache is empty and there still exist unread entries
            // 不满足条件
            if (cacheIndex < cacheEntries.size() || expired) {
                return;
            }

            // use try-with-resources to ensure RocksIterator can be release even some runtime
            // exception
            // occurred in the below code block.
            // 创建iteator
            try (RocksIteratorWrapper iterator =
                    RocksDBOperationUtils.getRocksIterator(
                            db, columnFamily, backend.getReadOptions())) {

                /*
                 * The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
                 * the currentEntry points to the last returned entry, and at that time, we will start
                 * the iterating from currentEntry if reloading cache is needed.
                 */
                byte[] startBytes =
                        (currentEntry == null ? keyPrefixBytes : currentEntry.rawKeyBytes);

                cacheEntries.clear();
                cacheIndex = 0;
                // 设置迭代器起点
                iterator.seek(startBytes);

                /*
                 * If the entry pointing to the current position is not removed, it will be the first entry in the
                 * new iterating. Skip it to avoid redundant access in such cases.
                 */
                if (currentEntry != null && !currentEntry.deleted) {
                    iterator.next();
                }

                while (true) {
                    // 如果迭代器不可用或者key前缀字节长度校验
                    if (!iterator.isValid()
                            || !startWithKeyPrefix(keyPrefixBytes, iterator.key())) {
                        expired = true;
                        break;
                    }

                    if (cacheEntries.size() >= CACHE_SIZE_LIMIT) {
                        break;
                    }

                    RocksDBMapEntry entry =
                            new RocksDBMapEntry(
                                    db,
                                    keyPrefixBytes.length,
                                    iterator.key(),
                                    iterator.value(),
                                    keySerializer,
                                    valueSerializer,
                                    dataInputView);
                    // 放入cache
                    cacheEntries.add(entry);

                    iterator.next();
                }
            }
        }
    }

RocksDBValueState

  @Override
    public V value() {
        try {
            byte[] valueBytes =
                    backend.db.get(columnFamily, serializeCurrentKeyWithGroupAndNamespace());

            if (valueBytes == null) {
                return getDefaultValue();
            }
            dataInputView.setBuffer(valueBytes);
            return valueSerializer.deserialize(dataInputView);
        } catch (IOException | RocksDBException e) {
            throw new FlinkRuntimeException("Error while retrieving data from RocksDB.", e);
        }
    }

    @Override
    public void update(V value) {
        if (value == null) {
            clear();
            return;
        }

        try {
            backend.db.put(
                    columnFamily,
                    writeOptions,
                    serializeCurrentKeyWithGroupAndNamespace(),
                    serializeValue(value));
        } catch (Exception e) {
            throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
        }
    }

RocksDBReducingState

class RocksDBReducingState<K, N, V> extends AbstractRocksDBAppendingState<K, N, V, V, V>
        implements InternalReducingState<K, N, V> {

    /** User-specified reduce function. */
    private final ReduceFunction<V> reduceFunction;

    /**
     * Creates a new {@code RocksDBReducingState}.
     *
     * @param columnFamily The RocksDB column family that this state is associated to.
     * @param namespaceSerializer The serializer for the namespace.
     * @param valueSerializer The serializer for the state.
     * @param defaultValue The default value for the state.
     * @param reduceFunction The reduce function used for reducing state.
     * @param backend The backend for which this state is bind to.
     */
    private RocksDBReducingState(
            ColumnFamilyHandle columnFamily,
            TypeSerializer<N> namespaceSerializer,
            TypeSerializer<V> valueSerializer,
            V defaultValue,
            ReduceFunction<V> reduceFunction,
            RocksDBKeyedStateBackend<K> backend) {

        super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
        this.reduceFunction = reduceFunction;
    }

    @Override
    public TypeSerializer<K> getKeySerializer() {
        return backend.getKeySerializer();
    }

    @Override
    public TypeSerializer<N> getNamespaceSerializer() {
        return namespaceSerializer;
    }

    @Override
    public TypeSerializer<V> getValueSerializer() {
        return valueSerializer;
    }

    @Override
    public V get() {
        return getInternal();
    }

    @Override
    public void add(V value) throws Exception {
        byte[] key = getKeyBytes();
        V oldValue = getInternal(key);
        V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, value);
        updateInternal(key, newValue);
    }

    @Override
    public void mergeNamespaces(N target, Collection<N> sources) {
        if (sources == null || sources.isEmpty()) {
            return;
        }

        try {
            V current = null;

            // merge the sources to the target
            for (N source : sources) {
                if (source != null) {
                    setCurrentNamespace(source);
                    final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
                    final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);

                    if (valueBytes != null) {
                        backend.db.delete(columnFamily, writeOptions, sourceKey);
                        dataInputView.setBuffer(valueBytes);
                        V value = valueSerializer.deserialize(dataInputView);

                        if (current != null) {
                            current = reduceFunction.reduce(current, value);
                        } else {
                            current = value;
                        }
                    }
                }
            }

            // if something came out of merging the sources, merge it or write it to the target
            if (current != null) {
                // create the target full-binary-key
                setCurrentNamespace(target);
                final byte[] targetKey = serializeCurrentKeyWithGroupAndNamespace();
                final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);

                if (targetValueBytes != null) {
                    dataInputView.setBuffer(targetValueBytes);
                    // target also had a value, merge
                    V value = valueSerializer.deserialize(dataInputView);

                    current = reduceFunction.reduce(current, value);
                }

                // serialize the resulting value
                dataOutputView.clear();
                valueSerializer.serialize(current, dataOutputView);

                // write the resulting value
                backend.db.put(
                        columnFamily, writeOptions, targetKey, dataOutputView.getCopyOfBuffer());
            }
        } catch (Exception e) {
            throw new FlinkRuntimeException("Error while merging state in RocksDB", e);
        }
    }

    @SuppressWarnings("unchecked")
    static <K, N, SV, S extends State, IS extends S> IS create(
            StateDescriptor<S, SV> stateDesc,
            Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>>
                    registerResult,
            RocksDBKeyedStateBackend<K> backend) {
        return (IS)
                new RocksDBReducingState<>(
                        registerResult.f0,
                        registerResult.f1.getNamespaceSerializer(),
                        registerResult.f1.getStateSerializer(),
                        stateDesc.getDefaultValue(),
                        ((ReducingStateDescriptor<SV>) stateDesc).getReduceFunction(),
                        backend);
    }
}

RocksDB工具类

OpenDB

/**
 * 指定对应列族
**/
public static RocksDB openDB(
            String path,
            List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
            List<ColumnFamilyHandle> stateColumnFamilyHandles,
            ColumnFamilyOptions columnFamilyOptions,
            DBOptions dbOptions)
            throws IOException {
        List<ColumnFamilyDescriptor> columnFamilyDescriptors =
                new ArrayList<>(1 + stateColumnFamilyDescriptors.size());

        // we add the required descriptor for the default CF in FIRST position, see
        // https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
        // 创建默认列族
        columnFamilyDescriptors.add(
                new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
        // 加入用户提供的状态列族
        columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);

        RocksDB dbRef;

        try {
            dbRef =
                    RocksDB.open(
                            Preconditions.checkNotNull(dbOptions),
                            Preconditions.checkNotNull(path),
                            columnFamilyDescriptors,
                            stateColumnFamilyHandles);
        } catch (RocksDBException e) {
            IOUtils.closeQuietly(columnFamilyOptions);
            columnFamilyDescriptors.forEach((cfd) -> IOUtils.closeQuietly(cfd.getOptions()));

            // improve error reporting on Windows
            throwExceptionIfPathLengthExceededOnWindows(path, e);

            throw new IOException("Error while opening RocksDB instance.", e);
        }

        // requested + default CF
        Preconditions.checkState(
                1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
                "Not all requested column family handles have been created");
        return dbRef;
    }