ExternalDeliveryDefault.java
package org.linkedopenactors.rdfpub.adapter.driven;
import java.security.PrivateKey;
import java.time.Duration;
import org.linkedopenactors.rdfpub.adapter.driven.HttpSignature.SignatureResponse;
import org.linkedopenactors.rdfpub.app.activity.DetermineReceiverService;
import org.linkedopenactors.rdfpub.app.actor.KeyPairGenerator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import org.springframework.web.server.ResponseStatusException;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
@Service
@Slf4j
public class ExternalDeliveryDefault //implements //org.linkedopenactors.rdfpub.app.activity.ExternalDelivery,
// ApplicationListener<CreateActivityOutboxReceived>
{
@Autowired
private DetermineReceiverService determineReceiverService;
@Autowired
private HttpSignature httpSignature;
@Autowired
private KeyPairGenerator keyPairGenerator;
@Value("${instance.domain}")
private String instanceDomain;
// @Override
// public void onApplicationEvent(CreateActivityOutboxReceived event) {
public void onApplicationEvent(org.linkedopenactors.rdfpub.domain.commonsrdf.Activity activity, org.linkedopenactors.rdfpub.domain.commonsrdf.Actor actor) {
// org.linkedopenactors.rdfpub.domain.commonsrdf.Activity activity = event.getActivity();
// ActivityPubObject object = event.getObject();
// String body = activityPubObjectTranslater.translate(activity, object);
String body = activity.toString();
// org.linkedopenactors.rdfpub.domain.commonsrdf.Actor actor = event.getActor();
determineReceiverService.getExternalReceiverInboxIris(activity).forEach(receiver->{
deliver(actor.getSubject().toString(), actor.getSubject().toString(), receiver.toString(), body);
});
}
private void deliver(String actorsPublicKeyId, String actorsIdentifier, String inbox, String body) {
actorsPublicKeyId = actorsPublicKeyId + "/publicKey";
// TODO ^^ i did not like this distributed url thing, how do we solve this ??
PrivateKey privateKey = keyPairGenerator.getPrivateKey(actorsIdentifier);
// inbox = "http://pasture_http_signature/";
SignatureResponse signature = httpSignature.signature(actorsPublicKeyId, privateKey, inbox, HttpMethod.POST, body);
WebClient webClient = WebClient
.builder()
.filters(exchangeFilterFunctions -> {
exchangeFilterFunctions.add(logRequest());
exchangeFilterFunctions.add(logResponse());
})
.build();
var request =
webClient
.post()
.uri(inbox)
.header("host", signature.getHost())
.header("date", signature.getDate())
.header("digest", signature.getDigest())
.header("signature", signature.getSignatureString())
.header("Content-Type", "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"");
String responseString = "";
try {
responseString = request
.body(BodyInserters.fromValue(body))
.retrieve()
.onStatus(HttpStatus::is5xxServerError,
response -> Mono.error(new ResponseStatusException(response.statusCode())))
.onStatus(HttpStatus::is4xxClientError, ClientResponse::createException)
.bodyToMono(String.class)
.retryWhen(Retry.backoff(1, Duration.ofSeconds(2))
.filter(throwable -> throwable instanceof ResponseStatusException))
.block();
} catch (WebClientResponseException e) {
log.debug("ResponseBodyAsString: " + e.getResponseBodyAsString());
e.printStackTrace();
}
log.debug("response: " + responseString);
}
private ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
if (log.isDebugEnabled()) {
StringBuilder sb = new StringBuilder("Response: \n");
clientResponse
.headers()
.asHttpHeaders()
.forEach((name, values) -> sb.append("\n\theader: " + name + " - " + values));
sb.append("\n\tstatus: " + clientResponse.statusCode());
log.debug(sb.toString());
}
return Mono.just(clientResponse);
});
}
private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
if (log.isDebugEnabled()) {
StringBuilder sb = new StringBuilder("Request: \n");
sb.append("url: " + clientRequest.url());
clientRequest
.headers()
.forEach((name, values) -> sb.append("\n\theader: " + name + " - " + values));
sb.append("body: \n" + clientRequest.body().toString());
log.debug(sb.toString());
}
return Mono.just(clientRequest);
}); }
}