package tech.powerjob.remote.akka; import akka.actor.AbstractActor; import akka.actor.Props; import akka.japi.pf.ReceiveBuilder; import lombok.extern.slf4j.Slf4j; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.remote.framework.actor.ActorInfo; import tech.powerjob.remote.framework.actor.HandlerInfo; import tech.powerjob.remote.framework.base.HandlerLocation; import tech.powerjob.remote.framework.utils.RemoteUtils; import java.lang.reflect.Method; import java.util.Optional; /** * ไปฃ็†็”จ็š„ actor * * @author tjq * @since 2023/1/6 */ @Slf4j public class AkkaProxyActor extends AbstractActor { private final Receive receive; private final ActorInfo actorInfo; public static Props props(ActorInfo actorInfo) { return Props.create(AkkaProxyActor.class, () -> new AkkaProxyActor(actorInfo)); } public AkkaProxyActor(ActorInfo actorInfo) { this.actorInfo = actorInfo; final ReceiveBuilder receiveBuilder = receiveBuilder(); actorInfo.getHandlerInfos().forEach(handlerInfo -> { final HandlerLocation location = handlerInfo.getLocation(); final Method handlerMethod = handlerInfo.getMethod(); final Optional> powerSerializeClz = RemoteUtils.findPowerSerialize(handlerMethod.getParameterTypes()); if (!powerSerializeClz.isPresent()) { throw new PowerJobException("build proxy for handler failed due to handler args is not PowerSerialize: " + location); } final Class bindClz = powerSerializeClz.get(); receiveBuilder.match(bindClz, req -> onReceiveProcessorReportTaskStatusReq(req, handlerInfo)); }); this.receive = receiveBuilder.build(); } @Override public Receive createReceive() { return receive; } private void onReceiveProcessorReportTaskStatusReq(T req, HandlerInfo handlerInfo) { try { final Object ret = handlerInfo.getMethod().invoke(actorInfo.getActor(), req); if (ret == null) { return; } if (ret instanceof Optional) { if (!((Optional) ret).isPresent()) { return; } } getSender().tell(ret, getSelf()); } catch (Exception e) { log.error("[PowerJob-AKKA] process failed!", e); } } }