jetlinks / jetlinks-community

JetLinks 基于Java8,Spring Boot 2.x ,WebFlux,Netty,Vert.x,Reactor等开发, 是一个全响应式的企业级物联网平台。支持统一物模型管理,多种设备,多种厂家,统一管理。统一设备连接管理,多协议适配(TCP,MQTT,UDP,CoAP,HTTP等),屏蔽网络编程复杂性,灵活接入不同厂家不同协议等设备。实时数据处理,设备告警,消息通知,数据转发。地理位置,数据可视化等。能帮助你快速建立物联网相关业务系统。
https://www.jetlinks.cn/
Apache License 2.0
5.65k stars 1.68k forks source link

如何在jetlinks 不使用响应式的r2dbc 而同时接入jdbc 完成对报表类的需求的开发等等 #547

Closed zhangle1 closed 2 months ago

zhangle1 commented 2 months ago

流式api 不管是回调地狱还是嵌套,维护成本太高了...

zhou-hao commented 2 months ago

没听说过reactor有回调地狱问题。reactive-streams规范的框架应该都没有。如果你硬要在lambda中写200行逻辑。用啥都白搭。

zhangle1 commented 2 months ago

image image 这是目前在做的几张报表... 一个是优化速度问题, 一个是嵌套问题, 目前已经影响到自己开发了 ,可能用sql只用一两个小时搞定的事 用流式做的话很痛苦。

zhangle1 commented 2 months ago

举个例子 假设我希望 查几个数据集 我希望能够直接 var entityOne=.Query(),var entityTwo=.Query("sql") 这样子我可以在代码里直接拼接。

zhou-hao commented 2 months ago

那直接执行sql不就行了? 见QueryHelper 或者 ReactiveSqlExecutor

zhangle1 commented 2 months ago

queryHelper.select 吗 这个要咋用 有例子吗

zhangle1 commented 2 months ago

他返回的还是Mono 对象 直接使用.block(); 好像就报错了

zhou-hao commented 2 months ago

你接口返回Flux就行了

zhangle1 commented 2 months ago

代码是这样的 查了一次之后好像就会报错block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-ht 请教一下 是我哪里使用有问题吗 我想后续优化成单表先全查出来 然后在代码里拼接报表

   public Flux<PagerResult<GkProductEffectReportDto>> queryProductEffectReport(@RequestBody GkReportQueryEntity query) {

        String workshopConition = "";

        String productLineConition = "";
//        and product_line_name like '%67A%'
        if (query.workshop != null && !query.workshop.equals("") && !query.workshop.equals("全部")) {
            workshopConition = " and e.work_shop_code like '%" + query.workshop + "%'";
        }

        if (query.productLine != null && !query.productLine.equals("") && !query.productLine.equals("全部")) {
            productLineConition = " and e.product_line_name like '%" + query.productLine + "%'";
        }

        String today = LocalDate.now().toString();

        String startDateTime = today + " 00:00:00";
        String startDateTime2 = today + " 00:00:00";
        String endDateTime = today + " 23:59:59";

        if (query.getDatePicker() != null && !query.getDatePicker().isEmpty()) {
            startDateTime = query.getDatePicker() + " 00:00:00";
            endDateTime = query.getDatePicker() + " 23:59:59";
        }
        //如果是当天的,查询设备表的数量
        String tableName = "LEFT JOIN dev_device_instance instance ON e.code = instance.id";
        if (!startDateTime2.equals(startDateTime)) {
            tableName = "LEFT JOIN g_device_benefit instance ON e.code = instance.device_code and Date(instance.time) = date('" + startDateTime + "')";
        }
        return Flux.just(queryHelper.select("SELECT\n" +
                                "    e.code, e.section, e.product_line_name, e.product_line_code, e.standard_qty, e.standard_cycle_sec,\n" +
                                "    instance.day_qty,\n" +
                                "    e.name,\n" +
                                "    dss.device_code,\n" +
                                "    dss.start_hours,\n" +
                                "    dss.run_hours,\n" +
                                "    des.type1_hours,\n" +
                                "    des.type2_hours,\n" +
                                "    des.type3_hours,\n" +
                                "    (dss.start_hours - dss.run_hours - COALESCE(des.type1_hours, 0) - COALESCE(des.type2_hours, 0) - COALESCE(des.type3_hours, 0)) AS chazhi\n" +
                                "FROM\n" +
                                "    g_equipment e\n" +
                                "LEFT JOIN (\n" +
                                "    SELECT\n" +
                                "        device_code,\n" +
                                "        SUM(EXTRACT(EPOCH FROM CASE WHEN state IN (1, 2) THEN date_trunc('second', end_time) - date_trunc('second', start_time) ELSE INTERVAL '0' END)) AS run_hours,\n" +
                                "        EXTRACT(EPOCH FROM AGE(MAX(date_trunc('second', end_time)), MIN(date_trunc('second', start_time)))) AS start_hours\n" +
                                "    FROM\n" +
                                "        g_device_state\n" +
                                "    WHERE\n" +
                                "        start_time >= DATE_TRUNC('day', TO_TIMESTAMP('" +
                                startDateTime +
                                "', 'YYYY-MM-DD HH24:MI:SS'))\n" +
                                "        AND end_time < DATE_TRUNC('day', TO_TIMESTAMP('" +
                                endDateTime +
                                "', 'YYYY-MM-DD HH24:MI:SS')) + INTERVAL '1 day'\n" +
                                "    GROUP BY\n" +
                                "        device_code\n" +
                                ") AS dss ON dss.device_code = e.code\n" +
                                "LEFT JOIN (\n" +
                                "    SELECT\n" +
                                "        device_code,\n" +
                                "        MAX(CASE WHEN error_type in ('4', '5') THEN error_hours END) AS type1_hours,\n" +
                                "        MAX(CASE WHEN error_type in ('2', '3') THEN error_hours END) AS type2_hours,\n" +
                                "        MAX(CASE WHEN error_type = '1' THEN error_hours END) AS type3_hours\n" +
                                "    FROM (\n" +
                                "        SELECT\n" +
                                "            gds.device_code,\n" +
                                "            gde.error_type,\n" +
                                "            SUM(EXTRACT(EPOCH FROM (date_trunc('second', gds.end_time) - date_trunc('second', gds.start_time))) * gde.go_on_time / t.total_go_on_time) AS error_hours\n" +
                                "        FROM\n" +
                                "            g_device_state gds\n" +
                                "        INNER JOIN\n" +
                                "            g_device_error_detail_his gde ON gds.id = gde.state_id\n" +
                                "        INNER JOIN (\n" +
                                "            SELECT\n" +
                                "                gde.state_id,\n" +
                                "                SUM(gde.go_on_time) AS total_go_on_time\n" +
                                "            FROM\n" +
                                "                g_device_error_detail_his gde\n" +
                                "            GROUP BY\n" +
                                "                gde.state_id\n" +
                                "        ) t ON gde.state_id = t.state_id\n" +
                                "        WHERE\n" +
                                "            gds.start_time >= DATE_TRUNC('day', TIMESTAMP '" +
                                startDateTime +
                                "')\n" +
                                "            AND gds.end_time < DATE_TRUNC('day', TIMESTAMP '" +
                                endDateTime +
                                "') + INTERVAL '1 day' \n" +
                                "            AND gds.state = 3\n" +
                                "        GROUP BY\n" +
                                "            gds.device_code, gde.error_type\n" +
                                "    ) AS error_hours\n" +
                                "    GROUP BY\n" +
                                "        device_code\n" +
                                ") AS des ON des.device_code = e.code\n" +
                                tableName +
//                                        "LEFT JOIN dev_device_instance instance ON e.code = instance.id" +
                                " where 1=1   " + workshopConition + productLineConition +
                                "ORDER BY code asc"
                        , GkProductEffectReportDto::new)
                .where(query)
                .fetchPaged().block());

    }
zhou-hao commented 2 months ago

直接返回查询的结果啊。还用啥Flux.just。 可以看看文档https://hanta.yuque.com/px7kg1/dev/dcwrzvngbg46ls8v

zhangle1 commented 2 months ago

我懂你的意思.. 所以我合并多个流是要这样吗.. var SourceOne= queryHelper.select() var SourceTwo= queryHelper.select()

Mono.zip(SourceOne,SourceTwo,BiFunction()=>{})

zhou-hao commented 2 months ago

我懂你的意思..

所以我合并多个流是要这样吗..

var SourceOne= queryHelper.select()

var SourceTwo= queryHelper.select()

Mono.zip(SourceOne,SourceTwo,BiFunction()=>{})

需要拿到2个操作的结果进行处理就可以用zip。 要把两个结果合并为一个 可以用Flux.merge。 可以看看这些操作符的注释,上面有数据流图。

zhangle1 commented 2 months ago

嗯 api 响应式api很像rxjava... 但是我感觉真的编程感受 体验不好,尤其是看同事的代码...
更喜欢有异步await 那种语法的语言. var entity= await query

zhou-hao commented 2 months ago

和rxjava都属于reactive-streams规范。 java里没有await,so....