博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
beam 的异常处理 Error Handling Elements in Apache Beam Pipelines
阅读量:6292 次
发布时间:2019-06-22

本文共 5654 字,大约阅读时间需要 18 分钟。

Error Handling Elements in Apache Beam Pipelines

I have noticed a deficit of documentation or examples outside of the official Beam docs, as data pipelines are often intimately linked with business logic. While working with streaming pipelines, I developed a simple error handling technique, to reduce the disruption that errors cause to streaming or long-running jobs. Here I have an explanation of that technique, and a simple demo pipeline.

 is a high level model for programming data processing pipelines. It provides language interfaces in both Java and Python, though Java support is more feature-complete.

Beam supports running in two modes: batch, and streaming. In batch mode, a finite data set is read in, processed, then output in one huge chunk. Streaming mode allows for data to be continuously read in from a streaming source (such as a message queue), processed in small chunks, and output as processing occurs. Streaming allows for analytics to be performed in “real time” as events occurs. This is extremely valuable for telemetry and logging, where engineers or other systems need feedback as events happen.

Beam pipelines are composed of a series of typed data sets (PCollections), and transforms. Transforms take a PCollection, perform a programmer-defined operation on the collection elements, then output zero or more new PCollections as a result.

The problem with these transforms is that they need to eventually operate on data. As anyone familiar with handling user input or data from large systems can attest, that data can be malformed, or just unexpected. If a bad piece of data enters the system, it may cause the entire pipeline to crash. This is a waste of time and compute resources at best, but can also result in losing in-memory streaming data, or disrupting downstream systems relying on the Beam output.

In order to stop a catastrophic failure, you need graceful error handling in your pipeline. The easiest way to do this is to add try-catch blocks within each transform, which prevents shutdown and allows all other elements to be processed.

 

A basic try/catch around a string conversion.

This is a start, but it’s not enough on its own. You’ll want to record failures — what data failed what transform, and why. To do this, you’ll want to create a data structure to store these errors, and an  for them.

The data structure for a failure should contain:

  • Source data in some form (data ID, the raw data fed into the transform, or the raw data precursor that was fed into the pipeline).
  • The reason for the failure.
  • The transform that failed.
 

Example constructor of a Failure object.

We can instantiate a Failure if an exception or error is thrown during a transform.

 

Parsing some fields out of auditd log strings. In this example, we use an inappropriately small number type. If the number is too large for an Integer, the transform outputs a Failure object, and continues processing elements.

Next, we need to be able to record the failure for developers to reference.

Beam transforms by default only have one output PCollection, but they can output multiple PCollections. A transform can return a PCollectionTuple, which uses TupleTag objects to reference which PCollection to put an element into, and which PCollection to fetch from the TupleTag. This has many uses, and we can use it here to separately output a PCollection of successful results, and a PCollection of Failure objects.

 

Accessing the PCollections stored in a PCollectionTuple.

In the demo repo, successes and failures are simply written to files. In a real pipeline, they would likely be sent to a database, or a message queue for additional processing or reporting.

You may also want to extend coverage beyond just handling thrown exceptions. For example, we could validate that all data falls within expected parameters (EG all user ids are ≥ 0) and is present, to prevent logical errors, missing records, or DB insertion failures further along. That validation could be extended into the Failure class, or it could be a new Invalid class and PCollection.

This covers the handling of elements themselves, but there are many design decisions beyond that, such as: what next? Data scientists or developers must review the errors, and discard data that is outright bad. If data is merely in an unexpected format, or exposed a now-fixed bug in the pipeline, then that data should be re-processed. It’s common (moreso in batch pipelines) to retry a whole dataset after any bugs in the pipeline are addressed. This is time consuming to process, but easy to support, and allows for grouped data (sums, aggregates, etc) to be corrected by adding the missing data. Some pipelines may only retry individual elements, if the pipeline is a 1-in-1-out process.

There is a GitHub repo at  which shows the full proof of concept using auditd log files.

 

final TupleTag successTag = new TupleTag<>() {};final TupleTag deadLetterTag = new TupleTag<>() {};PCollection input = /* … */;PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn
() { @Override void processElement(ProcessContext c) { try { c.output(process(c.element()); } catch (Exception e) { LOG.severe("Failed to process input {} -- adding to dead letter file", c.element(), e); c.sideOutput(deadLetterTag, c.element()); }}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));// Write the dead letter inputs to a BigQuery table for later analysisoutputTuple.get(deadLetterTag) .apply(BigQueryIO.write(...));// Retrieve the successful elements...PCollection
success = outputTuple.get(successTag);// and continue processing as desired ...

 

转载地址:http://hkcta.baihongyu.com/

你可能感兴趣的文章
给自己定的目标
查看>>
LAMP平台部署及应用
查看>>
Supervisor 托管服务
查看>>
分享一下收到的微软CRM云分享计划 邮件
查看>>
DVWA系列之21 存储型XSS分析与利用
查看>>
Hyper-V 2016 系列教程25 配置NFS 存储服务器
查看>>
vCloud Automation Center (vCAC) 6.0 (一)
查看>>
oracle 11g dataguard安装出现的错误
查看>>
Microsoft Dynamics CRM 2013 试用之系统篇 Windows Server 2012 R2安装
查看>>
Skype For Business 2015实战系列6:后端数据库安装CU6补丁
查看>>
web安全之信息刺探防范(上)
查看>>
Oracle CRS的管理与维护
查看>>
开启Sharepoint 2013站点邮箱
查看>>
【VMCloud云平台】SCO(一)规划
查看>>
相对路径和绝对路径错误造成的漏洞
查看>>
元胞自动机:更接近人类思考的智能模型
查看>>
ISCSI网络存储
查看>>
开源跳板机(堡垒机)Jumpserver v0.2.0 使用说明
查看>>
第二组视频:MySQL复制
查看>>
不同系统查WWN号
查看>>