ExternalDeliveryDefault.java

package org.linkedopenactors.rdfpub.adapter.driven;

import java.security.PrivateKey;
import java.time.Duration;

import org.linkedopenactors.rdfpub.adapter.driven.HttpSignature.SignatureResponse;
import org.linkedopenactors.rdfpub.app.activity.DetermineReceiverService;
import org.linkedopenactors.rdfpub.app.actor.KeyPairGenerator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import org.springframework.web.server.ResponseStatusException;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

@Service
@Slf4j
public class ExternalDeliveryDefault //implements //org.linkedopenactors.rdfpub.app.activity.ExternalDelivery,
//		ApplicationListener<CreateActivityOutboxReceived> 
{

	@Autowired
	private DetermineReceiverService determineReceiverService; 

	@Autowired
	private HttpSignature httpSignature;
	
	@Autowired
	private KeyPairGenerator keyPairGenerator; 
	

	@Value("${instance.domain}") 
	private String instanceDomain;
	
//	@Override
//	public void onApplicationEvent(CreateActivityOutboxReceived event) {
		public void onApplicationEvent(org.linkedopenactors.rdfpub.domain.commonsrdf.Activity activity, org.linkedopenactors.rdfpub.domain.commonsrdf.Actor actor) {
//		org.linkedopenactors.rdfpub.domain.commonsrdf.Activity activity =  event.getActivity();
//		ActivityPubObject object = event.getObject();
		
//		String body = activityPubObjectTranslater.translate(activity, object);
		String body = activity.toString();
//		org.linkedopenactors.rdfpub.domain.commonsrdf.Actor actor = event.getActor();
		determineReceiverService.getExternalReceiverInboxIris(activity).forEach(receiver->{
			deliver(actor.getSubject().toString(), actor.getSubject().toString(), receiver.toString(), body);	
		});		
	}
		
    private void deliver(String actorsPublicKeyId, String actorsIdentifier, String inbox, String body) {    	

    	actorsPublicKeyId = actorsPublicKeyId + "/publicKey";
    	// TODO ^^ i did not like this distributed url thing, how do we solve this ??
    	
    	PrivateKey privateKey = keyPairGenerator.getPrivateKey(actorsIdentifier);
        
//      inbox = "http://pasture_http_signature/";
        
        SignatureResponse signature = httpSignature.signature(actorsPublicKeyId, privateKey, inbox, HttpMethod.POST, body);
        
        WebClient webClient = WebClient
        .builder()
        .filters(exchangeFilterFunctions -> {
            exchangeFilterFunctions.add(logRequest());
            exchangeFilterFunctions.add(logResponse());
        })
        .build();
        
        var request =
                webClient
                        .post()
                        .uri(inbox)
                        .header("host", signature.getHost())
                        .header("date", signature.getDate())
                        .header("digest", signature.getDigest())
                        .header("signature", signature.getSignatureString())
                        .header("Content-Type", "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"");
        
        String responseString = "";
        try {
			responseString = request
					.body(BodyInserters.fromValue(body))
			        .retrieve()
			        .onStatus(HttpStatus::is5xxServerError,
			                response -> Mono.error(new ResponseStatusException(response.statusCode())))
			        .onStatus(HttpStatus::is4xxClientError, ClientResponse::createException)
			        .bodyToMono(String.class)
			        .retryWhen(Retry.backoff(1, Duration.ofSeconds(2))
			                .filter(throwable -> throwable instanceof ResponseStatusException))
			        .block();
		} catch (WebClientResponseException e) {
			log.debug("ResponseBodyAsString: " + e.getResponseBodyAsString());
			e.printStackTrace();
		}

        log.debug("response: " + responseString);
        
    }

	private ExchangeFilterFunction logResponse() {
		 return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
		        if (log.isDebugEnabled()) {
		            StringBuilder sb = new StringBuilder("Response: \n");
		            clientResponse
		              .headers()
		              .asHttpHeaders()
		              .forEach((name, values) -> sb.append("\n\theader: " + name + " - " + values));
		            sb.append("\n\tstatus: " + clientResponse.statusCode());
		            log.debug(sb.toString());
		        }
		        return Mono.just(clientResponse);
		    });
		}

	private ExchangeFilterFunction logRequest() {
		 return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
		        if (log.isDebugEnabled()) {
		            StringBuilder sb = new StringBuilder("Request: \n");
		            sb.append("url: " + clientRequest.url());
		            clientRequest
		              .headers()
		              .forEach((name, values) -> sb.append("\n\theader: " + name + " - " + values));
		            sb.append("body: \n" + clientRequest.body().toString());
		            log.debug(sb.toString());
		        }
		        return Mono.just(clientRequest);
		    });	}
}