利用java8 的 CompletableFuture 优化 Flink 程序

科技公元 后端 2024-10-15

利用java8 的 CompletableFuture 优化 Flink 程序

一、前言

目前 Flink 利用 avatorscript 脚本语言,来做到规则的自动化更新。avatorscript将表达式直接翻译成对应的 java 字节码执行,所以在大数据量的情况下,自然而然这里就成为了瓶颈

二、Flink 代码优化

2.0 问题发现

利用java8 的 CompletableFuture 优化 Flink 程序通过 Flink UI 发现 window 算子是瓶颈,而 window 算子的核心就是 avatorscript 表达式

2.1 原有代码java

代码解读
复制代码
xxx AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true); xxx

经过测试平均执行时间在1毫秒以内,但经不住数据量大,所以Flink QPS一直在 11w 左右

2.2 CompletableFuture 优化java

代码解读
复制代码
xxx List<CompletableFuture> executeFutures=new ArrayList<>(); CompletableFuture<Object> executeFuture = CompletableFuture.supplyAsync(() -> { return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true); }); executeFutures.add(executeFuture); for (int i = 0; i < executeFutures.size(); i++) { executeFutures.get(i).get() xxxx }

修改完上线后,Flink QPS 有原来 11W 增加到 17W 左右

三、avatorscript 使用的简单介绍

为了让你更容易理解 avatorscript,这里我们也可以先简单的介绍一下:

3.1 自定义函数java

代码解读
复制代码
class AddFunction extends AbstractFunction { @Override public AviatorObject call(Map<String, Object> env, AviatorObject arg1, AviatorObject arg2) { Number left = FunctionUtils.getNumberValue(arg1, env); Number right = FunctionUtils.getNumberValue(arg2, env); return new AviatorDouble(left.intValue() + right.intValue()); } public String getName() { return "add" ; } } public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException { //注册函数 AviatorEvaluator.addFunction(new AddFunction()); System.out.println(AviatorEvaluator.execute( "add(2,1)" )); }

3.2 从 Map 中取值java

代码解读
复制代码
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException { //注册函数 AviatorEvaluator.addFunction(new AddFunction()); HashMap<String, Object> stringObjectHashMap = new HashMap<>(); stringObjectHashMap.put( "testId1" , 1); stringObjectHashMap.put( "testId2" , 2); Object execute = AviatorEvaluator.execute( "add(testId1,testId2)" , stringObjectHashMap);

3.3 使用 Java 的工具类java

代码解读
复制代
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException { HashMap<String, Object> stringObjectHashMap = new HashMap<>(); stringObjectHashMap.put( "ip" , "a1111" ); // stringObjectHashMap.put("result", "a&B&C&d"); stringObjectHashMap.put( "voucher_endtime" , "2022.03.02 11:32" ); stringObjectHashMap.put( "imei2" , "v1aaaaaa1" ); stringObjectHashMap.put( "testId" , "v1ot_service_quality_1111" ); stringObjectHashMap.put( "testId1" , "sku" ); stringObjectHashMap.put( "a" , "123" ); stringObjectHashMap.put( "a1" , "null" ); stringObjectHashMap.put( "b1" , 123); AviatorEvaluator.addStaticFunctions( "doubleStatic" , Double.class); AviatorEvaluator.addInstanceFunctions( "doubleInstance" , Double.class) execute2 = AviatorEvaluator.execute( "(doubleStatic.valueOf(sys_net_bandwidth))" , stringObjectHashMap); System.out.println(execute2); execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) " , stringObjectHashMap); System.out.println( "###" + execute2); execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(str(voucher)))" , stringObjectHashMap);

3.4 AviatorScript 函数java

代码解读
复制代码
## examples/function.av fn add(x, y) { return x + y; } p(add(1,2))java
代码解读
复制代码
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException { String function = "## examples/function.av\n" + "\n" + "fn add(x, y) {\n" + " return x + y;\n" + "}" ; AviatorEvaluator.defineFunction( "add" , function); System.out.println( "defineFunction6666================+" + AviatorEvaluator.execute( "add(1,2)" , stringObjectHashMap)); }

四、总结

本文主要介绍了 Flink 中使用 avatorscript 脚本语言的问题,以及如何通过 CompletableFuture 优化代码来提高 Flink QPS。同时,还介绍了 avatorscript 的使用方法,包括自定义函数、从 Map 中取值、使用 Java 工具类和 AviatorScript 函数。通过本文的介绍,读者可以更好地了解 Flink 中 avatorscript 的使用方法,以及如何优化代码来提高 Flink QPS。

转载来源:https://juejin.cn/post/7372114027840094223

Apipost 私有化火热进行中

评论