Hadoop

Batch ingestion of data into Apache Pinot using Apache Hadoop.

Segment Creation and Push

Pinot supports Apache Hadoop as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.

You can follow the wiki to build Pinot from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar

Next, you need to change the execution config in the job spec to the following -

# executionFrameworkSpec: Defines ingestion jobs to be running.
executionFrameworkSpec:

    # name: execution framework name
  name: 'hadoop'

  # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'

  # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'

  # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'

  # segmentMetadataPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentMetadataPushJobRunner'

    # extraConfigs: extra configs for execution framework.
  extraConfigs:

    # stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
    stagingDir: your/local/dir/staging

You can check out the sample job spec here.

Finally execute the hadoop job using the command -

export PINOT_VERSION=0.10.0
export PINOT_DISTRIBUTION_DIR=${PINOT_ROOT_DIR}/build/
export HADOOP_CLIENT_OPTS="-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml"

hadoop jar  \\
        ${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \\
        org.apache.pinot.tools.admin.PinotAdministrator \\
        LaunchDataIngestionJob \\
        -jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/airlineStats/hadoopIngestionJobSpec.yaml

Ensure environment variables PINOT_ROOT_DIR and PINOT_VERSION are set properly.

Data Preprocessing before Segment Creation

We’ve seen some requests that data should be massaged (like partitioning, sorting, resizing) before creating and pushing segments to Pinot.

The MapReduce job called SegmentPreprocessingJob would be the best fit for this use case, regardless of whether the input data is of AVRO or ORC format.

Check the below example to see how to use SegmentPreprocessingJob.

In Hadoop properties, set the following to enable this job:

enable.preprocessing = true
preprocess.path.to.output = <output_path>

In table config, specify the operations in preprocessing.operations that you'd like to enable in the MR job, and then specify the exact configs regarding those operations:

{
    "OFFLINE": {
        "metadata": {
            "customConfigs": {
                “preprocessing.operations”: “resize, partition, sort”, // To enable the following preprocessing operations
                "preprocessing.max.num.records.per.file": "100",       // To enable resizing
                "preprocessing.num.reducers": "3"                      // To enable resizing
            }
        },
        ...
        "tableIndexConfig": {
            "aggregateMetrics": false,
            "autoGeneratedInvertedIndex": false,
            "bloomFilterColumns": [],
            "createInvertedIndexDuringSegmentGeneration": false,
            "invertedIndexColumns": [],
            "loadMode": "MMAP",
            "nullHandlingEnabled": false,
            "segmentPartitionConfig": {       // To enable partitioning
                "columnPartitionMap": {
                    "item": {
                        "functionName": "murmur",
                        "numPartitions": 4
                    }
                }
            },
            "sortedColumn": [                // To enable sorting
                "actorId"
            ],
            "streamConfigs": {}
        },
        "tableName": "tableName_OFFLINE",
        "tableType": "OFFLINE",
        "tenants": {
            ...
        }
    }
}

preprocessing.num.reducers

Minimum number of reducers. Optional. Fetched when partitioning gets disabled and resizing is enabled. This parameter is to avoid having too many small input files for Pinot, which leads to the case where Pinot server is holding too many small segments, causing too many threads.

preprocessing.max.num.records.per.file

Maximum number of records per reducer. Optional.Unlike, “preprocessing.num.reducers”, this parameter is to avoid having too few large input files for Pinot, which misses the advantage of muti-threading when querying. When not set, each reducer will finally generate one output file. When set (e.g. M), the original output file will be split into multiple files and each new output file contains at most M records. It does not matter whether partitioning is enabled or not.

For more details on this MR job, refer to this document.

Last updated