目前 Flink 利用 avatorscript 脚本语言,来做到规则的自动化更新。avatorscript将表达式直接翻译成对应的 java 字节码执行,所以在大数据量的情况下,自然而然这里就成为了瓶颈
通过 Flink UI 发现 window 算子是瓶颈,而 window 算子的核心就是 avatorscript 表达式
代码解读复制代码xxx
AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
xxx
经过测试平均执行时间在1毫秒以内,但经不住数据量大,所以Flink QPS一直在 11w 左右
代码解读复制代码xxx
List<CompletableFuture> executeFutures=new ArrayList<>();
CompletableFuture<Object> executeFuture = CompletableFuture.supplyAsync(() -> {
return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
});
executeFutures.add(executeFuture);
for (int i = 0; i < executeFutures.size(); i++) {
executeFutures.get(i).get()
xxxx
}
修改完上线后,Flink QPS 有原来 11W 增加到 17W 左右
为了让你更容易理解 avatorscript,这里我们也可以先简单的介绍一下:
代码解读复制代码class AddFunction extends AbstractFunction {
@Override
public AviatorObject call(Map<String, Object> env,
AviatorObject arg1, AviatorObject arg2) {
Number left = FunctionUtils.getNumberValue(arg1, env);
Number right = FunctionUtils.getNumberValue(arg2, env);
return new AviatorDouble(left.intValue() + right.intValue());
}
public String getName() {
return "add" ;
}
}
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
//注册函数
AviatorEvaluator.addFunction(new AddFunction());
System.out.println(AviatorEvaluator.execute( "add(2,1)" ));
}
代码解读复制代码public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
//注册函数
AviatorEvaluator.addFunction(new AddFunction());
HashMap<String, Object> stringObjectHashMap = new HashMap<>();
stringObjectHashMap.put( "testId1" , 1);
stringObjectHashMap.put( "testId2" , 2);
Object execute = AviatorEvaluator.execute( "add(testId1,testId2)" , stringObjectHashMap);
代码解读复制代public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
HashMap<String, Object> stringObjectHashMap = new HashMap<>();
stringObjectHashMap.put( "ip" , "a1111" );
// stringObjectHashMap.put("result", "a&B&C&d");
stringObjectHashMap.put( "voucher_endtime" , "2022.03.02 11:32" );
stringObjectHashMap.put( "imei2" , "v1aaaaaa1" );
stringObjectHashMap.put( "testId" , "v1ot_service_quality_1111" );
stringObjectHashMap.put( "testId1" , "sku" );
stringObjectHashMap.put( "a" , "123" );
stringObjectHashMap.put( "a1" , "null" );
stringObjectHashMap.put( "b1" , 123);
AviatorEvaluator.addStaticFunctions( "doubleStatic" , Double.class);
AviatorEvaluator.addInstanceFunctions( "doubleInstance" , Double.class)
execute2 = AviatorEvaluator.execute( "(doubleStatic.valueOf(sys_net_bandwidth))" , stringObjectHashMap);
System.out.println(execute2);
execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) " , stringObjectHashMap);
System.out.println( "###" + execute2);
execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(str(voucher)))" , stringObjectHashMap);
代码解读复制代码## examples/function.av
fn add(x, y) {
return x + y;
}
p(add(1,2))java
代码解读复制代码public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
String function = "## examples/function.av\n" +
"\n" +
"fn add(x, y) {\n" +
" return x + y;\n" +
"}" ;
AviatorEvaluator.defineFunction( "add" , function);
System.out.println( "defineFunction6666================+" + AviatorEvaluator.execute( "add(1,2)" , stringObjectHashMap));
}
本文主要介绍了 Flink 中使用 avatorscript 脚本语言的问题,以及如何通过 CompletableFuture 优化代码来提高 Flink QPS。同时,还介绍了 avatorscript 的使用方法,包括自定义函数、从 Map 中取值、使用 Java 工具类和 AviatorScript 函数。通过本文的介绍,读者可以更好地了解 Flink 中 avatorscript 的使用方法,以及如何优化代码来提高 Flink QPS。