Appearance
Kettle (Pentaho Data Integration) 笔记
Kettle(现更名为 Pentaho Data Integration,简称 PDI)是一款纯 Java 编写的开源 ETL 工具。其核心理念是“面向元数据”,即通过配置元数据(XML 格式的 .ktr 和 .kjb 文件,或资源库中的元数据)来定义数据流转逻辑,而非硬编码。
1. 核心工具集
| 工具名称 | 主要职能 | 运行方式 | 适用场景 |
|---|---|---|---|
| Spoon | 图形化开发界面 | GUI 桌面端 | 核心开发、流程设计、本地调试 |
| Pan | 转换(Transformation)执行器 | 命令行 (CLI) | 生产环境、调度系统单步执行数据清洗(本地文件或资源库) |
| Kitchen | 作业(Job)执行器 | 命令行 (CLI) | 生产环境定时调度完整的工作流(本地文件或资源库) |
| Carte | 轻量级集群/路由服务器 | 嵌入式 Web 服务 | 远程执行、微服务集成、分布式集群部署 |
2. 转换 (.ktr) vs 作业 (.kjb) 的区别
【作业 (Job)】---(串行控制)---> [作业项A] -> [作业项B] -> [调用转换C]
|
【转换 (Transformation)】<----------+
(并行流转) [输入步骤] ===(数据行缓存)===> [过滤步骤] ===(数据行缓存)===> [输出步骤]- 转换 (Transformation):
- 关注点: 数据流(Data Flow)。侧重于数据的提取、转换和装载。
- 执行机制: 并行执行。当启动一个转换时,所有的步骤(Steps)会同时启动。它们各自处于独立的线程中,数据通过“行缓存(Row Set)”以异步队列的方式从上游步骤传递给下游步骤。
- 最小单元: 步骤(Step)。
- 作业 (Job):
- 关注点: 工作流(Workflow)。侧重于控制流、步骤先后顺序、条件分支、定时触发及错误通知。
- 执行机制: 串行执行。严格按照连线箭头的方向,前一步成功或满足条件(True/False)后,才会触发下一步。
- 最小单元: 作业项(Job Entry)。
二、 核心高频组件深度解析
1. 输入组件 (Input)
- 表输入 (Table Input):
- 最常用的组件,通过 SQL 语句从数据库提取数据。
- 高级技巧: 勾选“替换 SQL 语句里的变量”,可以使用
${VAR}动态改变 SQL。如果勾选“从步骤插入数据”,可以通过上游步骤传入的字段,用?占位符实现参数化查询(类似于 PreparedStatement)。
- 文本文件输入 / CSV 文件输入:
- 用于读取扁平文件。需要注意编码格式(如
UTF-8、GBK)以及分隔符(如,、;、\t)。
- 用于读取扁平文件。需要注意编码格式(如
- REST Client / HTTP Post:
- 用于对接外部微服务 API,传入 URL 和参数,直接获取 JSON/XML 响应报文。
2. 转换/清洗组件 (Transform)
- 字段选择 (Select Values):
- 选择/改名: 移除不需要的字段,或者修改字段别名。
- 修改: 强转数据类型(如 String 转 Integer,String 转 Date 并指定格式
yyyy-MM-dd HH:mm:ss)。
- 值映射 (Value Mapper):
- 等价于 SQL 的
CASE WHEN ... THEN ... ELSE。例如将M映射为男,F映射为女,未匹配项给默认值。
- 等价于 SQL 的
- 字符串替换 / 字符串操作:
- 支持正则替换,去除首尾空格(Trim),大小写转换。
- JavaScript 代码 (Modified Java Script Value):
- 提供极其灵活的复杂脚本清洗能力。
- 注意: 内置的是 Rhino 引擎解析 JS,大数据量下(百万级以上)性能会明显下降,能用原生组件(如计算器、字符串操作)代替的尽量代替。
3. 应用/关联组件 (Lookup / Join)
- 数据库查询 (Database Lookup):
- 逐行去目标表查询匹配字段。由于每行数据都要发起一次数据库查询,若未开启缓存,性能极差。在大数据量时必须勾选**“启用缓存(Enable Cache)”**。
- 流查询 (Stream Lookup):
- 将副表的数据全部加载到内存中,然后主表在内存中进行匹配。适合副表数据量较小(如字典表、配置表)的场景,速度极快。
- 合并记录 (Merge Rows / Diff):
- 对比两个结构相同的数据集,自动标记哪些行是
identical(相同)、changed(修改)、new(新增)或deleted(删除)。常用于构建增量同步的基准。
- 对比两个结构相同的数据集,自动标记哪些行是
4. 输出组件 (Output)
- 表输出 (Table Output):
- 采用数据库提供的 JDBC Batch Commit 机制,批量向目标表灌入数据。速度最快。
- 插入/更新 (Insert / Update):
- 根据指定的“用来查询的关键字”在目标表中查找。如果记录存在且其他字段有变动,则执行
UPDATE;如果记录不存在,则执行INSERT。 - 代价: 每行都会进行一次先查后写,在大数据量(几百万条以上)同步时,由于频繁的索引读写与锁竞争,速度较慢。
- 根据指定的“用来查询的关键字”在目标表中查找。如果记录存在且其他字段有变动,则执行
三、 企业级实战场景解决方案
1. 基于时间戳的增量数据抽取架构
在真实的数仓建设中,不可能每天都全量同步千万级的大表。标准方案是利用“增量水位线(时间戳)”。
💡 核心实现逻辑:
- 作业设计:
[START] -> [获取目标表最大时间戳(转换)] -> [抽取并洗数增量数据(转换)] -> [更新/记录日志] -> [SUCCESS] - 第一步(转换 - 获取水位线):
- 用【表输入】查询目标表当前最大的更新时间:
SELECT MAX(update_time) AS max_time FROM dest_table - 连线到【设置变量】,设置一个作业级别的变量
LAST_UPDATE_TIME,值为${max_time}。若无记录,则给默认起始值(如1970-01-01 00:00:00)。
- 用【表输入】查询目标表当前最大的更新时间:
- 第二步(转换 - 抽取增量):
- 用【表输入】从源表抽取,SQL 语句写为:sql
SELECT * FROM source_table WHERE update_time > '${LAST_UPDATE_TIME}' - 勾选“替换 SQL 语句里的变量”。
- 尾端连接【表输出】或【插入/更新】写入目标表。
- 用【表输入】从源表抽取,SQL 语句写为:
2. 动态参数传递与多环境解耦
避免在 KTR 里面写死数据库 IP、用户名和密码。
💡 最佳实践:
- 全局配置文件
kettle.properties:- 在 Kettle 的家目录(通常在用户目录下的
.kettle/文件夹中)找到kettle.properties。 - 在里面配置环境变量:
- 在 Kettle 的家目录(通常在用户目录下的
properties
DB_HOST_PROD = 192.168.1.100
DB_USER_PROD = db_user
DB_PASS_PROD = Encrypted 2be98afc86aa7f2e4cb79ce71da9fa6d4- 在 Spoon 的 DB 连接配置里,主机名直接填写
${DB_HOST_PROD}。 - 命令行动态传参 (Named Parameters):
- 在转换/作业的属性(按下
Ctrl + T或Ctrl + J)的 "Parameters" 选项卡中定义参数名称(如ETL_DATE)。 - 在组件中通过
${ETL_DATE}引用。 - 使用命令行启动时传入:
kitchen.sh -file=job.kjb "-param:ETL_DATE=2026-05-20"。
- 在转换/作业的属性(按下
四、 生产环境性能优化调优
当遇到数仓同步缓慢、甚至卡死的问题时,应从以下四个维度进行全链路调优:
1. JVM 内存优化(解决 OOM)
Kettle 默认分配的堆内存只有 512M 或 1024M,处理大批量数据极易引发 java.lang.OutOfMemoryError: Java heap space。
- 修改文件:
spoon.sh(Linux) 或spoon.bat(Windows) /pan.sh/kitchen.sh - 优化参数: 搜索
PENTAHO_DI_JAVA_OPTIONS,根据服务器物理内存进行调整:
bash
# 调整前
PENTAHO_DI_JAVA_OPTIONS="-Xms1024m -Xmx2048m"
# 调整后 (以16G内存服务器为例,分配 8G 内存给 Kettle)
PENTAHO_DI_JAVA_OPTIONS="-Xms4g -Xmx8g -XX:+UseG1GC"2. 批量提交数 (Commit Size) 调优
在【表输出】组件中,“提交记录数量 (Commit size)”默认为 1000。
- 调优方案:
- 对于普通关系型数据库(MySQL/Oracle),建议将该值调整到
5000到20000之间。 - 如果调得太小(如100),I/O 和事务提交过于频繁,速度极慢;调得太大(如100,000),会导致单次事务占用过多数据库 Undo 表空间,或引发网卡吞吐瓶颈。
- 对于普通关系型数据库(MySQL/Oracle),建议将该值调整到
3. 步骤并发度调优 (Number of Copies)
由于“转换”内的组件是多线程并行的,如果某一个清洗环节(如复杂的【JavaScript 代码】或【数据库查询】)处理得特别慢,它会成为整条流水线的木桶短板(上游连线上的缓冲区 Row Set 会堆满,显示 100%)。
- 优化方案:
- 在卡顿的组件上右键 -> 改变开始复制的数量 (Change Number of Copies)。
- 默认是
1(单线程),可以根据服务器 CPU 核心数,将其调整为3或4。Kettle 会自动利用多线程并发处理该步骤的数据,大幅缓解性能瓶颈。
4. 数据库连接池与共享连接
- 多人在同一个工程下开发时,应当在“主对象树”中右键数据库连接,选择**“共享 (Share)”**。这会把连接信息写入全局,避免在每个 KTR 里面重复配置。
- 在连接配置的 "Pooling" (连接池) 选项卡中启用连接池,设置
Maximum Pool Size(如 20),以避免多线程并发时高频建立/销毁数据库连接。
五、 生产环境自动化调度(版本9.3)
在开发测试完毕后,严禁直接在 UI 界面 (Spoon) 挂着跑任务。必须根据服务器环境将其打包为自动化脚本。
1. 命令行参数内核详解(文件型 vs 资源库型)
- 本地文件型参数 (-file): 用于运行本地
.ktr/.kjb文件。 - 资源库型参数 (/rep): 用于连接 Kettle 数据库或文件资源库,命令如下:
/rep: 资源库名称/user: 资源库用户名/pass: 资源库密码/dir: 资源库中的目录路径(如/或/dw_project)/job或/trans: 资源库中的作业或转换名称
2. Windows 环境 (.bat) 资源库自动化调度脚本模板
如果您的 Kettle 部署在 Windows 服务器环境,且任务保存在 Kettle 资源库 中,可以使用以下批处理脚本。脚本开头利用了 goto 标签作为干净的注释块,便于维护。
batch
@goto label
@param rep 数据库资源库名称
@param user 数据库资源库账户
@param pass 数据库资源库密码
@param dir 资源库内作业所在路径
@param job 作业(Job)名称
@param level 日志打印级别 (Basic/Detailed/Debug)
@param logfile 运行日志输出本地路径
:label:
:: 1. CD至Kettle本地安装路径并切换盘符
E:
CD E:\迅雷下载\data-integration
:: 2. 设置资源库连接参数与作业变量
@SET rep=20orcl
@SET user=admin
@SET pass=admin
@SET dir=/
@SET job=test
@SET level=Basic
@SET logfile=C:\Users\Administrator\Desktop\demo.log
:: 3. 设置需要动态传递给作业的命名参数 (例如传入 hand=0 控制清洗分支)
@SET param="/param:hand=0"
:: 4. 调用 Kitchen 引擎执行资源库中的作业
kitchen /rep %rep% /user %user% /pass %pass% /dir %dir% /job %job% %param% /level %level% /logfile %logfile%3. Linux 环境 (.sh) 本地文件自动化调度脚本模板
若任务在 Linux 环境下以文件形式存放,通常配合 crontab 定时任务调用下方 Shell 脚本:
bash
#!/bin/bash
# 1. 配置环境变量 (防止 crontab 执行时找不到 Java 环境)
export JAVA_HOME=/usr/local/java/jdk1.8.0_211
export PATH=$JAVA_HOME/bin:$PATH
export KETTLE_HOME=/opt/data-integration
# 2. 定义路径变量
KITCHEN_BIN="${KETTLE_HOME}/kitchen.sh"
JOB_PATH="/opt/kettle/projects/dw_finance/main_loader.kjb"
LOG_DIR="/opt/kettle/logs"
LOG_FILE="${LOG_DIR}/job_$(date +%Y%m%d_%H%M%S).log"
mkdir -p ${LOG_DIR}
echo "[$(date "+%Y-%m-%d %H:%M:%S")] === Kettle 任务开始 ===" | tee -a ${LOG_FILE}
# 3. 动态计算业务日期(作为参数传递给 Kettle 作业)
if [ -n "$1" ]; then v_date=$1; else v_date=$(date -d "1 day ago" +%Y-%m-%d); fi
echo "当前处理的业务日期 (BIZ_DATE) 为: ${v_date}" | tee -a ${LOG_FILE}
# 4. 调用 Kitchen 命令执行本地作业
${KITCHEN_BIN} -file=${JOB_PATH} "-param:BIZ_DATE=${v_date}" -level=Basic >> ${LOG_FILE} 2>&1
# 5. 捕获退出状态码并告警
EXIT_CODE=$?
if [ ${EXIT_CODE} -eq 0 ]; then
echo "【SUCCESS】任务执行成功!"
exit 0
else
echo "【ERROR】任务执行失败,退出码: ${EXIT_CODE},请检查日志: ${LOG_FILE}"
exit ${EXIT_CODE}
fi六、 常见故障排查
1. 找不到数据库驱动 (Driver Not Found)
- 现象: 新建连接或跑脚本报错
Driver class 'com.mysql.cj.jdbc.Driver' could not be found。 - 根因: Kettle 属于纯 Java 应用,默认没有打包各类数据库的商业 JDBC 驱动。
- 解决办法: 下载对应数据库版本的驱动
.jar包(如ojdbc8.jar或mysql-connector-java.jar),将其复制到 Kettle 安装目录下的lib/文件夹中。必须彻底关闭并重启 Spoon 客户端或后台服务,驱动才会加载。
2. 字段中文乱码
- 现象: 从源库同步到目标库后,中文全部变成问号
???或乱码。 - 排查步骤:
- 确认输入编码: 若是文件输入,检查【文本文件输入】里的编码格式是否选对了
UTF-8或GBK。 - 确认数据库连接字符串(关键): 在 Kettle 中双击打开你的数据库连接配置,点击 "Advanced" (高级),在下方的空白文本框中加入以下连接参数(以 MySQL 为例):
- 确认输入编码: 若是文件输入,检查【文本文件输入】里的编码格式是否选对了
properties
characterEncoding=utf8
useUnicode=true3. 字段类型溢出、长度不足
- 现象: 报错
Value too long for column或者数据被无故截断。 - 排查步骤:
- 在源表抽取时,若某些字段是
VARCHAR(4000)或TEXT,进入 Kettle 后,Kettle 会在内部流中对字段长度做元数据定义。 - 如果在中间通过【字段选择 (Select Values)】或【串联字符串】改变了字段,请及时在【字段选择】的“元数据 (Meta-data)”选项卡中,显式地对该字段重新定义其长度 (Length)和精度 (Precision),确保下游输出组件能拿到正确的字段宽度。
- 在源表抽取时,若某些字段是