Skip to content

Kettle (Pentaho Data Integration) 笔记

Kettle(现更名为 Pentaho Data Integration,简称 PDI)是一款纯 Java 编写的开源 ETL 工具。其核心理念是“面向元数据”,即通过配置元数据(XML 格式的 .ktr 和 .kjb 文件,或资源库中的元数据)来定义数据流转逻辑,而非硬编码。

1. 核心工具集

工具名称主要职能运行方式适用场景
Spoon图形化开发界面GUI 桌面端核心开发、流程设计、本地调试
Pan转换(Transformation)执行器命令行 (CLI)生产环境、调度系统单步执行数据清洗(本地文件或资源库)
Kitchen作业(Job)执行器命令行 (CLI)生产环境定时调度完整的工作流(本地文件或资源库)
Carte轻量级集群/路由服务器嵌入式 Web 服务远程执行、微服务集成、分布式集群部署

2. 转换 (.ktr) vs 作业 (.kjb) 的区别

【作业 (Job)】---(串行控制)---> [作业项A] -> [作业项B] -> [调用转换C]
                                                          |
                       【转换 (Transformation)】<----------+
                       (并行流转) [输入步骤] ===(数据行缓存)===> [过滤步骤] ===(数据行缓存)===> [输出步骤]
  • 转换 (Transformation):
    • 关注点: 数据流(Data Flow)。侧重于数据的提取、转换和装载。
    • 执行机制: 并行执行。当启动一个转换时,所有的步骤(Steps)会同时启动。它们各自处于独立的线程中,数据通过“行缓存(Row Set)”以异步队列的方式从上游步骤传递给下游步骤。
    • 最小单元: 步骤(Step)。
  • 作业 (Job):
    • 关注点: 工作流(Workflow)。侧重于控制流、步骤先后顺序、条件分支、定时触发及错误通知。
    • 执行机制: 串行执行。严格按照连线箭头的方向,前一步成功或满足条件(True/False)后,才会触发下一步。
    • 最小单元: 作业项(Job Entry)。

二、 核心高频组件深度解析

1. 输入组件 (Input)

  • 表输入 (Table Input):
    • 最常用的组件,通过 SQL 语句从数据库提取数据。
    • 高级技巧: 勾选“替换 SQL 语句里的变量”,可以使用 ${VAR} 动态改变 SQL。如果勾选“从步骤插入数据”,可以通过上游步骤传入的字段,用 ? 占位符实现参数化查询(类似于 PreparedStatement)。
  • 文本文件输入 / CSV 文件输入:
    • 用于读取扁平文件。需要注意编码格式(如 UTF-8GBK)以及分隔符(如 ,;\t)。
  • REST Client / HTTP Post:
    • 用于对接外部微服务 API,传入 URL 和参数,直接获取 JSON/XML 响应报文。

2. 转换/清洗组件 (Transform)

  • 字段选择 (Select Values):
    • 选择/改名: 移除不需要的字段,或者修改字段别名。
    • 修改: 强转数据类型(如 String 转 Integer,String 转 Date 并指定格式 yyyy-MM-dd HH:mm:ss)。
  • 值映射 (Value Mapper):
    • 等价于 SQL 的 CASE WHEN ... THEN ... ELSE。例如将 M 映射为 F 映射为 ,未匹配项给默认值。
  • 字符串替换 / 字符串操作:
    • 支持正则替换,去除首尾空格(Trim),大小写转换。
  • JavaScript 代码 (Modified Java Script Value):
    • 提供极其灵活的复杂脚本清洗能力。
    • 注意: 内置的是 Rhino 引擎解析 JS,大数据量下(百万级以上)性能会明显下降,能用原生组件(如计算器、字符串操作)代替的尽量代替。

3. 应用/关联组件 (Lookup / Join)

  • 数据库查询 (Database Lookup):
    • 逐行去目标表查询匹配字段。由于每行数据都要发起一次数据库查询,若未开启缓存,性能极差。在大数据量时必须勾选**“启用缓存(Enable Cache)”**。
  • 流查询 (Stream Lookup):
    • 将副表的数据全部加载到内存中,然后主表在内存中进行匹配。适合副表数据量较小(如字典表、配置表)的场景,速度极快。
  • 合并记录 (Merge Rows / Diff):
    • 对比两个结构相同的数据集,自动标记哪些行是 identical(相同)、changed(修改)、new(新增)或 deleted(删除)。常用于构建增量同步的基准。

4. 输出组件 (Output)

  • 表输出 (Table Output):
    • 采用数据库提供的 JDBC Batch Commit 机制,批量向目标表灌入数据。速度最快。
  • 插入/更新 (Insert / Update):
    • 根据指定的“用来查询的关键字”在目标表中查找。如果记录存在且其他字段有变动,则执行 UPDATE;如果记录不存在,则执行 INSERT
    • 代价: 每行都会进行一次先查后写,在大数据量(几百万条以上)同步时,由于频繁的索引读写与锁竞争,速度较慢。

三、 企业级实战场景解决方案

1. 基于时间戳的增量数据抽取架构

在真实的数仓建设中,不可能每天都全量同步千万级的大表。标准方案是利用“增量水位线(时间戳)”。

💡 核心实现逻辑:

  1. 作业设计:
    [START] -> [获取目标表最大时间戳(转换)] -> [抽取并洗数增量数据(转换)] -> [更新/记录日志] -> [SUCCESS]
  2. 第一步(转换 - 获取水位线):
    • 用【表输入】查询目标表当前最大的更新时间:SELECT MAX(update_time) AS max_time FROM dest_table
    • 连线到【设置变量】,设置一个作业级别的变量 LAST_UPDATE_TIME,值为 ${max_time}。若无记录,则给默认起始值(如 1970-01-01 00:00:00)。
  3. 第二步(转换 - 抽取增量):
    • 用【表输入】从源表抽取,SQL 语句写为:
      sql
      SELECT * FROM source_table WHERE update_time > '${LAST_UPDATE_TIME}'
    • 勾选“替换 SQL 语句里的变量”。
    • 尾端连接【表输出】或【插入/更新】写入目标表。

2. 动态参数传递与多环境解耦

避免在 KTR 里面写死数据库 IP、用户名和密码。

💡 最佳实践:

  • 全局配置文件 kettle.properties:
    • 在 Kettle 的家目录(通常在用户目录下的 .kettle/ 文件夹中)找到 kettle.properties
    • 在里面配置环境变量:
properties
    DB_HOST_PROD = 192.168.1.100
    DB_USER_PROD = db_user
    DB_PASS_PROD = Encrypted 2be98afc86aa7f2e4cb79ce71da9fa6d4
  • 在 Spoon 的 DB 连接配置里,主机名直接填写 ${DB_HOST_PROD}
  • 命令行动态传参 (Named Parameters):
    • 在转换/作业的属性(按下 Ctrl + TCtrl + J)的 "Parameters" 选项卡中定义参数名称(如 ETL_DATE)。
    • 在组件中通过 ${ETL_DATE} 引用。
    • 使用命令行启动时传入:kitchen.sh -file=job.kjb "-param:ETL_DATE=2026-05-20"

四、 生产环境性能优化调优

当遇到数仓同步缓慢、甚至卡死的问题时,应从以下四个维度进行全链路调优:

1. JVM 内存优化(解决 OOM)

Kettle 默认分配的堆内存只有 512M 或 1024M,处理大批量数据极易引发 java.lang.OutOfMemoryError: Java heap space

  • 修改文件: spoon.sh (Linux) 或 spoon.bat (Windows) / pan.sh / kitchen.sh
  • 优化参数: 搜索 PENTAHO_DI_JAVA_OPTIONS,根据服务器物理内存进行调整:
bash
  # 调整前
  PENTAHO_DI_JAVA_OPTIONS="-Xms1024m -Xmx2048m"
  # 调整后 (以16G内存服务器为例,分配 8G 内存给 Kettle)
  PENTAHO_DI_JAVA_OPTIONS="-Xms4g -Xmx8g -XX:+UseG1GC"

2. 批量提交数 (Commit Size) 调优

在【表输出】组件中,“提交记录数量 (Commit size)”默认为 1000

  • 调优方案:
    • 对于普通关系型数据库(MySQL/Oracle),建议将该值调整到 500020000 之间。
    • 如果调得太小(如100),I/O 和事务提交过于频繁,速度极慢;调得太大(如100,000),会导致单次事务占用过多数据库 Undo 表空间,或引发网卡吞吐瓶颈。

3. 步骤并发度调优 (Number of Copies)

由于“转换”内的组件是多线程并行的,如果某一个清洗环节(如复杂的【JavaScript 代码】或【数据库查询】)处理得特别慢,它会成为整条流水线的木桶短板(上游连线上的缓冲区 Row Set 会堆满,显示 100%)。

  • 优化方案:
    • 在卡顿的组件上右键 -> 改变开始复制的数量 (Change Number of Copies)
    • 默认是 1(单线程),可以根据服务器 CPU 核心数,将其调整为 34。Kettle 会自动利用多线程并发处理该步骤的数据,大幅缓解性能瓶颈。

4. 数据库连接池与共享连接

  • 多人在同一个工程下开发时,应当在“主对象树”中右键数据库连接,选择**“共享 (Share)”**。这会把连接信息写入全局,避免在每个 KTR 里面重复配置。
  • 在连接配置的 "Pooling" (连接池) 选项卡中启用连接池,设置 Maximum Pool Size(如 20),以避免多线程并发时高频建立/销毁数据库连接。

五、 生产环境自动化调度(版本9.3)

在开发测试完毕后,严禁直接在 UI 界面 (Spoon) 挂着跑任务。必须根据服务器环境将其打包为自动化脚本。

1. 命令行参数内核详解(文件型 vs 资源库型)

  • 本地文件型参数 (-file): 用于运行本地 .ktr/.kjb 文件。
  • 资源库型参数 (/rep): 用于连接 Kettle 数据库或文件资源库,命令如下:
    • /rep: 资源库名称
    • /user: 资源库用户名
    • /pass: 资源库密码
    • /dir: 资源库中的目录路径(如 //dw_project
    • /job/trans: 资源库中的作业或转换名称

2. Windows 环境 (.bat) 资源库自动化调度脚本模板

如果您的 Kettle 部署在 Windows 服务器环境,且任务保存在 Kettle 资源库 中,可以使用以下批处理脚本。脚本开头利用了 goto 标签作为干净的注释块,便于维护。

batch
@goto label  
@param rep      数据库资源库名称  
@param user     数据库资源库账户  
@param pass     数据库资源库密码  
@param dir      资源库内作业所在路径  
@param job      作业(Job)名称  
@param level    日志打印级别 (Basic/Detailed/Debug)  
@param logfile  运行日志输出本地路径
:label:

:: 1. CD至Kettle本地安装路径并切换盘符
E:
CD E:\迅雷下载\data-integration

:: 2. 设置资源库连接参数与作业变量
@SET rep=20orcl
@SET user=admin 
@SET pass=admin
@SET dir=/
@SET job=test
@SET level=Basic
@SET logfile=C:\Users\Administrator\Desktop\demo.log

:: 3. 设置需要动态传递给作业的命名参数 (例如传入 hand=0 控制清洗分支)
@SET param="/param:hand=0"

:: 4. 调用 Kitchen 引擎执行资源库中的作业
kitchen /rep %rep% /user %user% /pass %pass% /dir %dir% /job %job% %param% /level %level% /logfile %logfile%

3. Linux 环境 (.sh) 本地文件自动化调度脚本模板

若任务在 Linux 环境下以文件形式存放,通常配合 crontab 定时任务调用下方 Shell 脚本:

bash
#!/bin/bash
# 1. 配置环境变量 (防止 crontab 执行时找不到 Java 环境)
export JAVA_HOME=/usr/local/java/jdk1.8.0_211
export PATH=$JAVA_HOME/bin:$PATH
export KETTLE_HOME=/opt/data-integration

# 2. 定义路径变量
KITCHEN_BIN="${KETTLE_HOME}/kitchen.sh"
JOB_PATH="/opt/kettle/projects/dw_finance/main_loader.kjb"
LOG_DIR="/opt/kettle/logs"
LOG_FILE="${LOG_DIR}/job_$(date +%Y%m%d_%H%M%S).log"

mkdir -p ${LOG_DIR}
echo "[$(date "+%Y-%m-%d %H:%M:%S")] === Kettle 任务开始 ===" | tee -a ${LOG_FILE}

# 3. 动态计算业务日期(作为参数传递给 Kettle 作业)
if [ -n "$1" ]; then v_date=$1; else v_date=$(date -d "1 day ago" +%Y-%m-%d); fi
echo "当前处理的业务日期 (BIZ_DATE) 为: ${v_date}" | tee -a ${LOG_FILE}

# 4. 调用 Kitchen 命令执行本地作业
${KITCHEN_BIN} -file=${JOB_PATH} "-param:BIZ_DATE=${v_date}" -level=Basic >> ${LOG_FILE} 2>&1

# 5. 捕获退出状态码并告警
EXIT_CODE=$?
if [ ${EXIT_CODE} -eq 0 ]; then
    echo "【SUCCESS】任务执行成功!"
    exit 0
else
    echo "【ERROR】任务执行失败,退出码: ${EXIT_CODE},请检查日志: ${LOG_FILE}"
    exit ${EXIT_CODE}
fi

六、 常见故障排查

1. 找不到数据库驱动 (Driver Not Found)

  • 现象: 新建连接或跑脚本报错 Driver class 'com.mysql.cj.jdbc.Driver' could not be found
  • 根因: Kettle 属于纯 Java 应用,默认没有打包各类数据库的商业 JDBC 驱动。
  • 解决办法: 下载对应数据库版本的驱动 .jar 包(如 ojdbc8.jarmysql-connector-java.jar),将其复制到 Kettle 安装目录下的 lib/ 文件夹中。必须彻底关闭并重启 Spoon 客户端或后台服务,驱动才会加载。

2. 字段中文乱码

  • 现象: 从源库同步到目标库后,中文全部变成问号 ??? 或乱码。
  • 排查步骤:
    1. 确认输入编码: 若是文件输入,检查【文本文件输入】里的编码格式是否选对了 UTF-8GBK
    2. 确认数据库连接字符串(关键): 在 Kettle 中双击打开你的数据库连接配置,点击 "Advanced" (高级),在下方的空白文本框中加入以下连接参数(以 MySQL 为例):
properties
     characterEncoding=utf8
     useUnicode=true

3. 字段类型溢出、长度不足

  • 现象: 报错 Value too long for column 或者数据被无故截断。
  • 排查步骤:
    • 在源表抽取时,若某些字段是 VARCHAR(4000)TEXT,进入 Kettle 后,Kettle 会在内部流中对字段长度做元数据定义。
    • 如果在中间通过【字段选择 (Select Values)】或【串联字符串】改变了字段,请及时在【字段选择】的“元数据 (Meta-data)”选项卡中,显式地对该字段重新定义其长度 (Length)精度 (Precision),确保下游输出组件能拿到正确的字段宽度。