在我的项目中,我必须撰写一个休息客户端,它将从休息服务接收一个 HttpResponse 作为未来。我想要的是记录回应的状态代码,如果出现任何例外,也要记录该例外。我怎样才能使用管道模式来实作这一点。PFB 我的代码片段:
class MetadataAggregator(implicit config: Config) extends Actor with ActorLogging {
import context.system
import akka.pattern.pipe
implicit val exec = context.system.dispatcher
val url = sys.env.getOrElse("URL", config.getString("conf.url"))
override def receive: Receive = {
case MetadataEvent(event, phase, topic) =>
val payload = captureMetadata(event, phase, topic)
publishMetadata(payload)
case _ => //Do nothing
}
def publishMetadata(payload: String) = {
log.info(s"Metadata payload : $payload")
val responseFuture = Http().singleRequest(
HttpRequest(
HttpMethods.POST,
uri = url,
entity = HttpEntity(
ContentTypes.`application/json`,
payload
)
)
)
responseFuture.pipeTo(self)
}
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case ex: RuntimeException =>
log.error(s"${self.path} incurred an exception. $ex...")
ex.printStackTrace()
log.info("Resuming...")
Resume
case any: Any =>
log.error(s"${self.path} stopping due to $any ...")
any.printStackTrace()
Stop
}
}
Bdw,我不能使用 akka 型别化的actor,因为整个项目都在使用非型别化的actor。
uj5u.com热心网友回复:
该pipeTo
呼叫正在将 发送HttpResponse
给参与者,因此您需要在receive
方法中处理它。但我建议创建一条包含有效负载和回应的新讯息,并将其发送到self
. 这允许您描述导致回应的负载。
HttpResponse
正在被捕获并忽略,因此case _ =>
记录任何意外讯息通常是一个好主意,以便更早地捕获此类事情。
示例代码:
为结果创建一个新类:
case class PublishResult(payload: String, result: Try[HttpResponse])
在publishMetadata
:
val responseFuture = Http().singleRequest(???)
responseFuture.onComplete{ res =>
self ! PublishResult(payload, res)
}
在receive
添加这个处理程序:
case PublishResult(payload, res) =>
res match {
case Failure(e) =>
log.warn("Request payload {} failed: {}", payload, e.getMessage)
case Success(response) =>
log.debug("Request succeeded")
}
0 评论