Elasticsearch Ingest

Elasticsearch 5.x引入了一组强大的功能,针对通过摄取节点(ingest node)摄取文档时出现的问题。
elasticsearch的节点可以是

  • 主节点(master node)
  • 数据节点(data node)
  • 摄取节点(ingest node)

将摄取组件与其他组件分开的想法是为了避免由于在预处理文档期间可能出现的问题, 来创建更稳定的集群。为了创建一个更稳定的集群,在发生某些问题的情况下,如
由于其他插件而导致的崩溃以及由于复杂的类型操作而导致的高负载,摄像节点应该和主节点或数据节点隔离。

Pipeline 定义

ingest nodes的工作就是在文档被发送到data node之前对文档进行预处理操作。这个过程称为流水线定义(pipeline definition),此流水线的每一步都是处理器定义(processor definition)。
为了定义有一个ingestion pipeline,我们需要提供一些processors. 比如下面定义了一个pipline,增加一个字段user,他的值为john:

{
    "description": "Add user john field",
    "processors": [
        {
            "set": {
                "field": "user",
                "value": "john"
            }
        }
    ]
}

通用的模板形式为:

{
    "description": "...",
    "processors": [ ... ],
    "version": 1,
    "on_failure": [ ... ]
}
  • description 该描述包含了pipeline的描述信息。 如果您在集群中存储了大量管道,这是非常有用的。
  • processors 包含processors操作的列表。 他们将按顺序执行。
  • version 这个字段是可选的,但它在跟踪管道版本方面非常有用。
  • on_failure 允许我们定义要在正常流水线执行期间出现故障时要应用的处理器列表。

为了防止在缺少字段或类似约束的情况下失败,一些processors提供了ignore_failure属性。
例如,具有处理缺失字段的重命名字段的管道应该以这种方式定义:

{
    "description": "my pipeline with handled exceptions",
    "processors": [
        {
            "rename": {
                "field": "foo",
                "target_field": "bar",
                "ignore_failure": true
            }
        }
    ]
}

Put an ingest pipeline

pipeline definition的功能是在不重新启动节点的情况下进行更新和创建(与Logstash相比)。 定义通过put pipeline API存储在集群状态中。
在定义了一个管道之后,我们需要将它提供给Elasticsearch集群。为了存储或者更新pipeline,我们需要进行下面的步骤:

curl -XPUT 'http://127.0.0.1:9200/_ingest/pipeline/add-user-john' -d '
{
    "description": "Add user john field",
    "processors": [
        {
            "set": {
                "field": "user",
                "value": "john"
            }
        }
    ],
    "version": 1
}'

如果执行成功的话会返回:

{"acknowledged":true}

PUT pipeline对于创建和更新pipeline都是可用的,管道以群集状态存储,并立即传播到所有摄取节点。 当摄入节点接收到新的管道时,它们将更新其节点内存中的管道表示:管道更改立即生效。在集群中存储管道时,请注意为其提供有意义的名称(在示例中为add-user-john),以便轻松了解管道的作用。在put调用中使用的流水线的名称将是其他流水线中的管道的ID。 在将您的管道存储在Elasticsearch之后,可以将提供管道名称的文档作为查询参数进行索引。比如:

curl -XPUT http://localhost:9200/my_index/my_type/my_id?pipeline=add-user-john -d '{}'

Get an ingest pipeline

在存储了管道以后,就可以查询他了:

curl -XGET 'http://127.0.0.1:9200/_ingest/pipeline/add-user-john'

返回的结果为:

{
    "add-user-john": {
        "description": "Add user john field",
        "processors": [
            {
                "set": {
                    "field": "user",
                    "value": "john"
                }
            }
        ],
        "version": 1
    }
}

如果想一次性获得所有的管道:

curl -XGET 'http://127.0.0.1:9200/_ingest/pipeline/*'

如果想使用通配符获取部分的管道:

curl -XGET 'http://127.0.0.1:9200/_ingest/pipeline/add-*'

如果你仅仅想拿到管道的部分信息,可以使用filter_path

curl -XGET 'http://127.0.0.1:9200/_ingest/pipeline/add-user-john? filter_path=*.version'

将只返回version信息:

{
    "add-user-john": {
        "version": 1
    }
}

Delete an ingest pipeline

curl -XDELETE 'http://127.0.0.1:9200/_ingest/pipeline/add-user-john'

删除管道API不允许在管道名称/ ID中使用通配符。
由于管道由于其集群级存储而保持在每个节点的内存中,并且管道始终在摄入节点中运行; 最好的做法是只保留集群中所需的管道。

Simulate an ingest pipeline

每个架构的摄取部分非常敏感,所以Elasticsearch团队已经创建了模拟您的管道的可能性,而无需将其存储在Elasticsearch中。 模拟管道API允许用户测试/改进和检查管道的功能,而不需要Elasticsearch集群中进行部署。
我们需要同时提交需要处理的文档以及对应的pipeline:

curl -XPOST 'http://127.0.0.1:9200/_ingest/pipeline/_simulate' -d '
{
    "pipeline": {
        "description": "Add user john field",
        "processors": [
            {
                "set": {
                    "field": "user",
                    "value": "john"
                }
            },
            {
                "set": {
                    "field": "job",
                    "value": 10
                }
            }
        ],
        "version": 1
    },
    "docs": [
        {
            "_index": "index",
            "_type": "type",
            "_id": "1",
            "_source": {
                "name": "docs1"
            }
        },
        {
            "_index": "index",
            "_type": "type",
            "_id": "2",
            "_source": {
                "name": "docs2"
            }
        }
    ]
}'

如果执行成功,那么es返回的结果如下:

{
    "docs": [
        {
            "doc": {
                "_index": "index",
                "_id": "1",
                "_type": "type",
                "_source": {
                    "name": "docs1",
                    "job": 10,
                    "user": "john"
                },
                "_ingest": {
                    "timestamp": "2016-12-10T13:33:24.375+0000"
                }
            }
        },
        {
            "doc": {
                "_index": "index",
                "_id": "2",
                "_type": "type",
                "_source": {
                    "name": "docs2",
                    "job": 10,
                    "user": "john"
                },
                "_ingest": {
                    "timestamp": "2016-12-10T13:33:24.375+0000"
                }
            }
        }
    ]
}

Built-in processors

Elasticsearch默认提供了一大批摄取处理器。我们在这里仅仅展示一些比较常用的:

curl -XPOST 'http://127.0.0.1:9200/_ingest/pipeline/_simulate?pretty' -d '
{
    "pipeline": {
        "description": "Testing some build-processors",
        "processors": [
            {
                "dot_expander": {
                    "field": "extfield.innerfield"
                }
            },
            {
                "remove": {
                    "field": "unwanted"
                }
            },
            {
                "trim": {
                    "field": "message"
                }
            },
            {
                "set": {
                    "field": "tokens",
                    "value": "{{message}}"
                }
            },
            {
                "split": {
                    "field": "tokens",
                    "separator": "\\s+"
                }
            },
            {
                "sort": {
                    "field": "tokens",
                    "order": "desc"
                }
            },
            {
                "convert": {
                    "field": "mynumbertext",
                    "target_field": "mynumber",
                    "type": "integer"
                }
            }
        ]
    },
    "docs": [
        {
            "_index": "index",
            "_type": "type",
            "_id": "1",
            "_source": {
                "extfield.innerfield": "booo",
                "unwanted": 32243,
                "message": " 155.2.124.3 GET /index.html 15442 0.038 ",
                "mynumbertext": "3123"
            }
        }
    ]
}'

返回的结果为:

{
    "docs": [
        {
            "doc": {
                "_index": "index",
                "_type": "type",
                "_id": "1",
                "_source": {
                    "mynumbertext": "3123",
                    "extfield": {
                        "innerfield": "booo"
                    },
                    "tokens": [
                        "GET",
                        "155.2.124.3",
                        "15442",
                        "0.038",
                        "/index.html"
                    ],
                    "message": "155.2.124.3 GET /index.html 15442 0.038",
                    "mynumber": 3123
                },
                "_ingest": {
                    "timestamp": "2016-12-10T16:49:40.875+0000"
                }
            }
        }
    ]
}
本文版权归作者所有,禁止一切形式的转载,复制等操作
赞赏

微信赞赏支付宝赞赏

发表评论

电子邮件地址不会被公开。 必填项已用*标注