核心内容摘要
48
简简单单 Online zuozuo 本心、输入输出、结果文章目录使用 Logstash 进行数据摄取从 PostgreSQL 到 Elasticsearch前言
什么是 Logstash
Windows 下安装 Logstash
安装 JDBC 驱动并创建管道配置
Filter 与 Output 说明
运行管道与验证
方案优点
方案缺点与适用场景使用 Logstash 进行数据摄取从 PostgreSQL 到 Elasticsearch编辑 | 简简单单 Online zuozuo地址 | https://blog.csdn.net/qq_15071263如果觉得本文对你有帮助欢迎关注、点赞、收藏、评论谢谢前言本文介绍如何使用 Logstash 将 PostgreSQL 中的数据同步到 Elasticsearch涵盖 Logstash 的基本概念、在 Windows 下的安装步骤、基于 JDBC 的增量摄取管道配置以及 Input、Filter、Output 各阶段的说明。
文末会
总结该方案的优缺点与适用场景便于你在实际项目中选型与落地。
#Logstash #PostgreSQL #Elasticsearch #数据摄取 #JDBC #ELK #数据同步
什么是 LogstashLogstash 是 Elastic 旗下的开源数据处理管道用于从各类数据源采集、转换并输出到不同目标例如 Elasticsearch、Kafka、平面文件等。
一条 Logstash 管道通常包含三个阶段Input输入数据来源负责从数据源拉取待摄取的数据。
Filter过滤对数据进行清洗、聚合、解析等转换常用插件包括 Grok、Mutate、Date 等。
Output输出数据写入的目标如 Elasticsearch、文件、数据库等。
将数据通过 Logstash 写入 Elastic 前需满足以下前置条件本机已安装 Logstash并配备 PostgreSQL 的 JDBC 驱动。
具备可用的 PostgreSQL 数据库且存在需要同步的表或可通过函数查询的数据。
已有运行中的 Elasticsearch 实例。
Windows 下安装 Logstash下面简要说明在本地安装并运行 Logstash 的步骤。
安装 Java从 Oracle 官网 下载 JDKJava 8 或更高版本解压到指定目录。
解压完成后需要配置环境变量以便系统识别 Java 命令新建环境变量JAVA_HOME指向 JDK 安装目录并在系统Path中追加%JAVA_HOME%\bin。
在命令行中执行以下命令可验证是否安装成功java -version若配置正确会输出当前 Java 版本信息。
安装 Logstash从 Elastic 官网 下载 Logstash 安装包并解压到指定目录。
在命令行中进入 Logstash 解压目录下的bin目录执行bin\logstash -einput { stdin { } } output { stdout { } }若出现正常启动并等待 stdin 输入的提示说明 Logstash 已能本地运行。
安装 JDBC 驱动并创建管道配置安装 PostgreSQL JDBC 驱动从 PostgreSQL 官网 下载 PostgreSQL JDBC 驱动将得到的.jar文件放到 Logstash 可访问的目录例如 Logstash 安装目录下的lib或单独目录。
创建 Logstash 管道配置文件下面是一份用于增量摄取的示例管道配置管道会记录上次运行位置并按调度如每分钟只摄取自上次运行以来有变化的数据。
input{jdbc{jdbc_driver_libraryC:/path/to/postgresql-
x.x.jarjdbc_driver_classorg.postgresql.Driverjdbc_connection_stringjdbc:postgresql://localhost:5432/your_databasejdbc_useryour_usernamejdbc_passwordyour_passwordschedule* * * * *statementSELECT * FROM your_table WHERE updated_at :sql_last_value ORDER BY updated_atuse_column_valuetruetracking_columnupdated_attracking_column_typetimestamplast_run_metadata_pathC:/path/to/last_run_metadata}}filter{mutate{remove_field[date]}mutate{rename{first_namename}}}output{elasticsearch{hosts[http://localhost:9200]userelasticpasswordyour_elastic_passwordindexyour_index_namedocument_id%{id}}}Input 部分要点说明jdbc_driver_libraryPostgreSQL JDBC 驱动.jar文件所在路径。
jdbc_driver_class使用的驱动类名PostgreSQL 为org.postgresql.Driver。
jdbc_connection_stringPostgreSQL 连接字符串。
jdbc_user / jdbc_password数据库用户名与密码。
paging可按页拉取数据如每页 1000 条有利于性能与进度控制。
schedule调度表达式例如* * * * *表示每分钟执行一次与 cron 格式一致。
statement要执行的 SQL。
复杂 SQL 可写入单独.sql文件并在配置中使用statement_filepath指向该文件。
增量相关use_column_value设为true时Logstash 使用tracking_column对应列如updated_at的实际值作为:sql_last_value而不是上次执行时间。
tracking_column/tracking_column_type用于增量跟踪的列名与类型。
last_run_metadata_path保存上次运行位置的元数据文件路径供下次运行读取。
Filter 与 Output 说明FilterFilter 为可选阶段用于在写入目标前对数据进行清洗或转换。
上述示例中使用mutate删除了date字段并将源数据中的first_name映射为目标中的name字段。
OutputOutput 定义数据写入的目标。
本例中为 Elasticsearch需配置hosts、认证信息如有、索引名index以及document_id。
在增量摄取场景下建议显式设置document_id如%{id}使用主键。
写入时 Elasticsearch 会按该 ID 查找文档若已存在则更新同一条文档避免重复。
若不设置document_id每次都会生成新文档容易产生重复数据。
运行管道与验证在命令行中进入 Logstash 安装目录执行将配置文件路径替换为你的实际路径bin\logstash -f C:\path\to\your_pipeline.conf管道启动后可在控制台看到拉取与写入的日志。
在 Kibana 或通过 Elasticsearch API 查询对应索引即可验证数据是否按预期从 PostgreSQL 同步到 Elasticsearch。
方案优点Logstash 开源、生态成熟易于在现有环境中部署和集成。
提供大量插件200可通过 Filter 完成解析、转换、聚合等复杂处理。
数据源与 Elasticsearch 解耦便于更换数据源或目标。
与 Elasticsearch 集成简单配置清晰。
方案缺点与适用场景缺点延迟不适合对极低延迟或实时性要求很高的场景管道越复杂加载、转换和发送耗时越长。
错误与监控若不单独做监控与告警出错时较难排查可能造成数据遗漏或重复。
重复数据若未正确设置document_id或增量逻辑容易产生重复文档。
启动时间相比部分轻量工具Logstash 启动较慢。
配置维护使用类 YAML 的配置文件管道复杂时维护成本会上升。
资源占用在高负载或复杂管道下CPU、内存占用较高。
适用场景该方案适合需要稳定、集中式的批处理/准实时数据管道的场景例如将业务库PostgreSQL数据定期或按增量同步到 Elasticsearch 做检索与分析。
若需求是毫秒级实时流或超大规模实时同步可再评估 Kafka Connector 或 Elasticsearch 自身 CDC 等方案。
生如逆旅一苇以航欢迎关注、欢迎联系交流、欢迎沟通想法、欢迎交换意见、欢迎合作咨询感谢亲的关注、点赞、收藏、评论一键三连支持谢谢