技术解析
为什么一注册定时器,下面的定时器立马触发,有点费解
我的理解 不是基于我时间时间的时间戳+我自己设置的时间 到了这个点才触发吗
ps:代码复制下 直接可以跑
public class TwoStreamsJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
KeyedStream orderStream = env
.fromElements(
new OrderEvent("order_1", "pay", 1616497937786L))
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(10)) {
@Override
public long extractTimestamp(OrderEvent element) {
return element.eventTime;
}
})
.keyBy(r -> r.orderId);
KeyedStream payStream = env
.fromElements(
new PayEvent("order_2", "weixin", 1616497937786L)
)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(5)) {
@Override
public long extractTimestamp(PayEvent element) {
return element.eventTime;
}
})
.keyBy(r -> r.orderId);
SingleOutputStreamOperator result = orderStream
.connect(payStream)
.process(new CoProcessFunction() {
private ValueState orderState;
private ValueState payState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
orderState = getRuntimeContext().getState(
new ValueStateDescriptor("order", OrderEvent.class)
);
payState = getRuntimeContext().getState(
new ValueStateDescriptor("pay", PayEvent.class)
);
}
@Override
public void processElement1(OrderEvent orderEvent, Context context, Collector collector) throws Exception {
PayEvent pay = payState.value();
if (pay != null) {
payState.clear();
collector.collect("order id " + orderEvent.orderId + " matched success");
} else {
orderState.update(orderEvent);
context.timerService().registerEventTimeTimer(orderEvent.eventTime + 500000L);
}
}
@Override
public void processElement2(PayEvent payEvent, Context context, Collector collector) throws Exception {
OrderEvent order = orderState.value();
if (order != null) {
orderState.clear();
collector.collect("order id" + payEvent.orderId + " matched success");
} else {
payState.update(payEvent);
context.timerService().registerEventTimeTimer(payEvent.eventTime + 500000L);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
System.out.println("@@@@@"+ ctx.timerService().currentWatermark());
if (orderState.value() != null) {
System.out.println("触发了 timer");
}
if (payState.value() != null) {
System.out.println("触发了 timer");
}
}
});
result.print();
env.execute();
}
public static class OrderEvent {
public String orderId;
public String eventType;
public Long eventTime;
public Long timestamp;
public OrderEvent(String orderId, String eventType, Long eventTime) {
this.orderId = orderId;国外服务器
this.eventType = eventType;
this.eventTime = eventTime;
this.timestamp = eventTime;
}
public OrderEvent() { }
}
public static class PayEvent {
public String orderId;
public String eventType;
public Long eventTime;
public Long timestamp;
public PayEvent(String orderId, String eventType, Long eventTime) {
this.orderId = orderId;
this.eventType = eventType;
this.eventTime = eventTime;
this.timestamp = eventTime;
}
public PayEvent() {
}
}
}