技术解析

请教一个 flink ontimer 触发的问题
0
2021-06-08 11:33:01
idczone

为什么一注册定时器,下面的定时器立马触发,有点费解

我的理解 不是基于我时间时间的时间戳+我自己设置的时间 到了这个点才触发吗

ps:代码复制下 直接可以跑

public class TwoStreamsJoin {

   

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        KeyedStream orderStream = env
                .fromElements(
                        new OrderEvent("order_1", "pay", 1616497937786L))
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(10)) {
                    @Override
                    public long extractTimestamp(OrderEvent element) {
                        return element.eventTime;
                    }
                })
                .keyBy(r -> r.orderId);

        KeyedStream payStream = env
                .fromElements(
                        new PayEvent("order_2", "weixin", 1616497937786L)
                )
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(5)) {
                    @Override
                    public long extractTimestamp(PayEvent element) {
                        return element.eventTime;
                    }
                })
                .keyBy(r -> r.orderId);

        SingleOutputStreamOperator result = orderStream
                .connect(payStream)
                .process(new CoProcessFunction() {
                    private ValueState orderState;
                    private ValueState payState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        orderState = getRuntimeContext().getState(
                                new ValueStateDescriptor("order", OrderEvent.class)
                        );
                        payState = getRuntimeContext().getState(
                                new ValueStateDescriptor("pay", PayEvent.class)
                        );
                    }

                    @Override
                    public void processElement1(OrderEvent orderEvent, Context context, Collector collector) throws Exception {
                        PayEvent pay = payState.value();
                        if (pay != null) {
                            payState.clear();
                            collector.collect("order id " + orderEvent.orderId + " matched success");
                        } else {
                            orderState.update(orderEvent);
                            context.timerService().registerEventTimeTimer(orderEvent.eventTime + 500000L);
                        }
                    }

                    @Override
                    public void processElement2(PayEvent payEvent, Context context, Collector collector) throws Exception {
                        OrderEvent order = orderState.value();
                        if (order != null) {
                            orderState.clear();
                            collector.collect("order id" + payEvent.orderId + " matched success");
                        } else {
                            payState.update(payEvent);
                            context.timerService().registerEventTimeTimer(payEvent.eventTime + 500000L);
                        }
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
                        System.out.println("@@@@@"+ ctx.timerService().currentWatermark());
                        if (orderState.value() != null) {
                           System.out.println("触发了 timer");
                        }
                        if (payState.value() != null) {
                            System.out.println("触发了 timer");
                        }
                    }
                });

        result.print();
        

        env.execute();
    }


    public static class OrderEvent {
        public String orderId;
        public String eventType;
        public Long eventTime;
        public Long timestamp;

        public OrderEvent(String orderId, String eventType, Long eventTime) {
            this.orderId = orderId;国外服务器
            this.eventType = eventType;
            this.eventTime = eventTime;
            this.timestamp = eventTime;
        }

        public OrderEvent() { }
    }


    public static class PayEvent {
        public String orderId;
        public String eventType;
        public Long eventTime;
        public Long timestamp;
        public PayEvent(String orderId, String eventType, Long eventTime) {
            this.orderId = orderId;
            this.eventType = eventType;
            this.eventTime = eventTime;
            this.timestamp = eventTime;

        }

        public PayEvent() {
        }

    }
}





数据地带为您的网站提供全球顶级IDC资源
在线咨询
专属客服