淘宝运营

随着淘宝基于内容的策略的发展,对实时产品选择的需求日益增强。对于某些对实时性有严格要求的产品,淘宝运营人员希望他们定义的产品池能够实时生效,以供文章编写者立即使用。为此,可以使用流计算来实现实时产品选择。本文介绍了阿里巴巴的Blink实时流计算技术,该技术用于实现实时产品选择,并在试验中取得了令人满意的结果。

问题分析

为了实现实时产品选择,必须解决以下问题:

实时触发:在需要触发数据源进行流计算并将用户提交的要素数据存储在IDB中的情况下,IDB如何与Blink计算过程相关联?

中间状态存储:在Blink计算过程中,必须根据业务场景记录以前的中间计算状态。在这种情况下,如何在需要时实时存储和读取这些中间状态?

实时验证:鉴于Blink计算结果最终将在搜索引擎上生效,Blink如何与搜索引擎进行交互,以便结果可以实时生效?

增量数据:如果无法进行增量数据处理,则在离线传输完整数据期间,部分更新的数据将被覆盖。在这种情况下,如何追加增量数据?

淘宝运营

“ TT +眨眼+ HBase +
Swift”解决方案解决了这些问题。具体来说,TimeTunnel(TT)可以解决实时触发问题,HBase可以解决中间状态存储问题,Swift可以解决实时验证和增量数据添加问题。TT,HBase和Swift的描述如下:

TT:作为阿里云日志收集系统,它允许用户订阅日志。另外,它支持IDB和Blink,并且是IDB和Blink之间的重要交互介质。

HBase:这是一个开源的非关系分布式数据库产品。它可以与Blink正确连接,并可以用于存储和读取中间计算状态。

Swift:这是阿里巴巴搜索业务部门开发的消息传递系统。当前,主要搜索引擎通过Swift系统实时传输消息。Swift可用于解决引擎的实时验证和增量数据添加问题。

淘宝运营实施过程

Blink流程的工作量分散到六个节点,分别是日志解析节点,查询拆分节点,SP请求节点,数据处理节点,TT回写节点和Swift消息传递节点。实时计算过程如下:

用户提交用于产品选择的特征数据,该特征数据存储在IDB中并同步到TT日志。

TT日志的更新将触发闪烁任务,并且日志解析节点解析TT日志以检索用于产品选择的功能数据。

查询拆分节点估计所需SPU的数量,根据SPU的数量确定并发请求的数量,并连接SP参数。

SP请求节点发送并发的SP服务请求以获得SPU信息。

数据处理节点从HBase读取中间状态,并基于服务逻辑执行计算。

数据处理节点将计算结果写回到HBase数据库中,以进行下一次计算。

TT写回节点和Swift消息节点分别将计算结果写回到TT和Swift。

转储模块接受Swift消息并将数据更新到引擎以使其实时生效。

TT记录计算结果并将其写回ODPS,以离线进行完整计算。

实施细节

产品选择功能的实现主要取决于开发Blink任务。在开发Blink任务之前,您必须了解UDF,UDTF和UDAF的概念。

Blink开发期间的主要任务是实现UDF。首先,基于流计算过程划分多个计算节点,例如,查询拆分节点和SP请求节点在实现过程中是独立的计算节点。然后,必须基于每个节点的实现逻辑来确定和实现UDF类。下面以SP请求节点为例,详细说明实现过程。

节点分析: SP请求节点的业务场景是一对多的过程。因此,UDTF类用于实现。

UDTF类封装:此类需要继承TableFunction,其中TableFunction是为其本身定义的pojo,并传递给下一个运行的节点。

节点输出:您需要定义自己的pojo类(即上一步中提到的“ T”项),以便该节点的输出在下一个节点上可见。

主要功能关联:
Blink开发过程需要一个主要功能来关联每个计算节点以进行流计算。我们建议您使用Scala语言开发主要功能,以促进对代码的理解。

参考代码

以下列出了SP请求节点的UDTF实现代码。该代码的基本思想是将SP的返回结果同时输出到下一个节点。

public class SearchEngineUdtf extends TableFunction {

private static final Logger logger =
LoggerFactory.getLogger(SearchEngineUdtf.class);

/**

* Request the engine to retrieve the recall field

* @param params

*/

public void eval(String params) {

SpuSearchResult spuSearchResult =
SpuSearchEngineUtil.getFromSpuSearch(params);

if(spuSearchResult.getSuccess()){

// Result parsing

JSONObject kxuanObj =
SpuSearchEngineUtil.getSpResponseJson(spuSearchResult, “sp_kxuan”);

if(null == kxuanObj || kxuanObj.isEmpty()){

logger.error(“sp query: ” + spuSearchResult.getSearchURL());

logger.error(String.format(“[%s],%s”, Constant.ERR_PAR_SP_RESULT,”get
key:sp_kxuan data failed! “));

}else {

List engineFieldsList = SpuSearchEngineUtil.getSpAuction(kxuanObj);

// Concurrently output to the data stream

for(EngineFields engineFields : engineFieldsList){

collect(engineFields);

}

}

}else {

logger.error(String.format(“[%s],%s”,Constant.ERR_REQ_SP, “request
SpuEngine failed!”));

}

}

}

启动闪烁任务

当前,在集群中启动Blink任务的过程尚未完全自动化。在完成Blink任务的开发之后,您需要按照上图所示的过程启动它。启动任务后,您可以登录到Yarn以检查任务节点的运行状态。

概括

启动该功能后,具有超过10,000个SPU的选择池可以在几分钟内生效,从而极大地提高了淘宝运营的产品选择效率。

By 中正

发表评论

邮箱地址不会被公开。 必填项已用*标注