ActivityPubObjectDefault.java

package org.linkedopenactors.rdfpub.domain.commonsrdf;

import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.commons.rdf.api.Graph;
import org.apache.commons.rdf.api.IRI;
import org.apache.commons.rdf.api.Literal;
import org.apache.commons.rdf.api.RDF;
import org.apache.commons.rdf.api.RDFTerm;
import org.apache.commons.rdf.api.Triple;
import org.linkedopenactors.rdfpub.domain.commonsrdf.vocab.AS;
import org.linkedopenactors.rdfpub.domain.commonsrdf.vocab.OWL;
import org.linkedopenactors.rdfpub.domain.commonsrdf.vocab.PROV;
import org.linkedopenactors.rdfpub.domain.commonsrdf.vocab.Rdf;
import org.linkedopenactors.rdfpub.domain.commonsrdf.vocab.Vocabularies;

import com.neovisionaries.i18n.LanguageCode;

import lombok.extern.slf4j.Slf4j;

@Slf4j
class ActivityPubObjectDefault implements ActivityPubObject {

	private Graph graph;
	protected RDF rdf;
	private IRI subject;
	private GraphToStringConverter graphToStringConverter;
	private SubjectProvider subjectProvider;
	private Vocabularies vocabularies;
	private InstanceProperties instanceProperties;

	public ActivityPubObjectDefault(RDF rdf, IRI subject, Graph graph, GraphToStringConverter graphToStringConverter, SubjectProvider subjectProvider, 
			Vocabularies vocabularies, InstanceProperties instanceProperties) {		
		this.rdf = rdf;
		this.subject = subject;
		this.vocabularies = vocabularies;
		this.instanceProperties = instanceProperties;
		this.graph = Optional.ofNullable(graph).orElse(rdf.createGraph()); // TODO ACHTUNG, ist das wieder ein parallelstreamiger ???
		this.graphToStringConverter = graphToStringConverter;
		this.subjectProvider = subjectProvider;
	}
	
	@Override
	public Set<IRI> getAttributedtTo() {
		return getIris(rdf.createIRI(AS.attributedTo));
	}

	protected Set<IRI> getIris(IRI predicate) {				
		Stream<? extends Triple> stream = graph.stream(subject, predicate, null);
		Set<IRI> collect = stream
				.map(Triple::getObject)
				.map(IRI.class::cast)
				.collect(Collectors.toSet());
		log.debug("getIris("+subject+", "+predicate+", null) -> " + collect);
		return collect;
	}

	protected Optional<IRI> getIri(IRI predicate) {
		Stream<? extends Triple> stream = graph.stream(getSubject(), predicate, null);
		Set<IRI> collect = stream
				.map(Triple::getObject)
				.filter(IRI.class::isInstance)
				.map(IRI.class::cast)
				.collect(Collectors.toSet());
		if(collect.size()>1) {
			throw new IllegalStateException("Exact one hit for '"+predicate+"' expected, but was: " + collect.size());
		}
		return collect.stream().findFirst();
		// TODO use also getRdfTerm(IRI predicate) and do filtering on the result
	}

	protected Optional<RDFTerm> getRdfTerm(IRI predicate) {
		Stream<? extends Triple> stream = graph.stream(getSubject(), predicate, null);
		Set<RDFTerm> collect = stream
				.map(Triple::getObject)
				.collect(Collectors.toSet());
		if(collect.size()>1) {
			throw new IllegalStateException("Exact one hit for '"+predicate+"' expected, but was: " + collect.size());
		}
		return collect.stream().findFirst();
	}

	@Override
	public void setAttributedTo(Set<IRI> attributedtTo) {
		set(rdf.createIRI(AS.attributedTo), attributedtTo);		
	}
	
	@Override
	public void addAttributedTo(IRI attributedtTo) {
		add(rdf.createIRI(AS.attributedTo), attributedtTo);		
	}

	@Override
	public Set<IRI> getAudience() {
		return getIris(rdf.createIRI(AS.audience));
	}

	@Override
	public void setAudience(Set<IRI> audience) {
		set(rdf.createIRI(AS.audience), audience);
	}

	@Override
	public void addAudience(IRI audience) {
		add(rdf.createIRI(AS.audience), audience);		
	}

	@Override
	public Optional<String> getName() {
		// TODO contentMap
		return graph.stream(subject, rdf.createIRI(AS.name), null)
				.findFirst()
				.map(Triple::getObject)
				.map(Literal.class::cast)
				.map(Literal::getLexicalForm);
	}

	@Override
	public void setName(LanguageCode languageCode, String name) {
		// TODO contentMap
		set(rdf.createIRI(AS.name), rdf.createLiteral(name));
	}

	@Override
	public Optional<String> getContent() {
		// TODO contentMap
		return graph.stream(subject, rdf.createIRI(AS.content), null).findFirst().map(Triple::getObject).map(RDFTerm::toString);
	}

	@Override
	public Optional<String> getName(LanguageCode languageCode) {
		return graph.stream(subject, rdf.createIRI(AS.name), null).findFirst().map(Triple::getObject).map(RDFTerm::toString);
	}

	@Override
	public void setName(String name) {
		set(rdf.createIRI(AS.name), rdf.createLiteral(name));
	}

	@Override
	public void setContent(String content) {
		set(rdf.createIRI(AS.content), rdf.createLiteral(content));
	}

	@Override
	public Optional<Instant> getPublished() {
		return graph.stream(subject, rdf.createIRI(AS.published), null)
				.findFirst()
				.map(Triple::getObject)
				.map(Literal.class::cast)
				.map(this::toInstant);
	}

	@Override
	public void setPublished(Instant instant) {
		set(rdf.createIRI(AS.published), fromInstant(instant));
	}

	@Override
	public void setUpdated(Instant instant) {
		set(rdf.createIRI(AS.updated), fromInstant(instant));		
	}

	@Override
	public Optional<String> getSummary() {
		return graph.stream(subject, rdf.createIRI(AS.summary), null).findFirst().map(Triple::getObject).map(RDFTerm::toString);
	}

	@Override
	public Optional<String> getSummary(LanguageCode languageCode) {
		// TODO name map !!
		return graph.stream(subject, rdf.createIRI(AS.summary), null).findFirst().map(Triple::getObject).map(RDFTerm::toString);
	}

	@Override
	public void setSummary(LanguageCode languageCode, String summary) {
		// TODO name map !!
		set(rdf.createIRI(AS.summary), rdf.createLiteral(summary));
	}

	@Override
	public void setSummary(String summary) {
		set(rdf.createIRI(AS.summary), rdf.createLiteral(summary));
	}
	

//	@Override
//	public void addUpdated(Instant instant) {
//		add(rdf.createIRI(AS.updated), fromInstant(instant));		
//	}

	@Override
	public Set<IRI> getTo() {
		return getIris(rdf.createIRI(AS.to));
	}

	@Override
	public void setTo(Set<IRI> to) {
		set(rdf.createIRI(AS.to), to);
	}

	@Override
	public void addTo(IRI to) {
		add(rdf.createIRI(AS.to), to);		
	}

	@Override
	public Set<IRI> getBto() {
		return getIris(rdf.createIRI(AS.bto));
	}

	@Override
	public void setBto(Set<IRI> bto) {
		set(rdf.createIRI(AS.bto), bto);
	}

	@Override
	public void addBto(IRI bto) {
		add(rdf.createIRI(AS.bto), bto);		
	}

	@Override
	public Set<IRI> getCc() {
		return getIris(rdf.createIRI(AS.cc));
	}

	@Override
	public void setCc(Set<IRI> cc) {
		set(rdf.createIRI(AS.cc), cc);
	}

	@Override
	public void addCc(IRI cc) {
		add(rdf.createIRI(AS.cc), cc);
	}

	@Override
	public Set<IRI> getBcc() {
		return getIris(rdf.createIRI(AS.bcc));
	}

	@Override
	public void setBcc(Set<IRI> bcc) {
		set(rdf.createIRI(AS.bcc), bcc);
	}

	@Override
	public void addBcc(IRI bcc) {
		add(rdf.createIRI(AS.bcc), bcc);
	}

	@Override
	public Set<IRI> getReceivers() {
		HashSet<IRI> receivers = new HashSet<>();
		receivers.addAll(getAudience());
		receivers.addAll(getTo());
		receivers.addAll(getBto());
		receivers.addAll(getCc());
		receivers.addAll(getBcc());
		return receivers;
	}

	@Override
	public void hideBlindReceivers() {
		setBcc(Collections.emptySet());
		setBto(Collections.emptySet());
	}

	@Override
	public boolean isAttributedTo(IRI requestedActorId) {
		return getIris(rdf.createIRI(AS.attributedTo)).contains(requestedActorId);
	}

	@Override
	public boolean isReceiver(IRI requestedActorId) {
		return getReceivers().contains(requestedActorId);
	}

//	@Override
//	public void unify(ActivityPubObject activity) {
//		// TODO Auto-generated method stub
//		
//	}

//	@Override
//	public void setNamespaces(Map<String, IRI> namespaces) {
//		// TODO Auto-generated method stub
//		
//	}
//
//	@Override
//	public ActivityPubObject addNamespaces(Map<String, IRI> namespaces) {
//		// TODO Auto-generated method stub
//		return null;
//	}
//
//	@Override
//	public ActivityPubObject addNamespace(String prefix, IRI namespace) {
//		// TODO Auto-generated method stub
//		return null;
//	}

	@Override
	public IRI getSubject() {
		return subject;
	}

//	@Override
//	public Map<String, IRI> namespaces() {
//		// TODO Auto-generated method stub
//		return null;
//	}

	@Override
	public void setType(Set<IRI> type) {
		set(rdf.createIRI(Rdf.type), type);
	}

	@Override
	public void setWasAttributedTo(IRI actor) {
		set(rdf.createIRI(PROV.wasAttributedTo), actor);
	}

	@Override
	public Optional<IRI> getWasAttributedTo() {
		return getIri(rdf.createIRI(PROV.wasAttributedTo));
	}

	@Override
	public void setWasGeneratedBy(IRI subject) {
		set(rdf.createIRI(PROV.wasGeneratedBy), subject);		
	}

	@Override
	public Optional<IRI> getWasGeneratedBy() {
		return getIri(rdf.createIRI(PROV.wasGeneratedBy));
	}

	@Override
	public void setWasRevisionOf(IRI subject) {
		set(rdf.createIRI(PROV.wasRevisionOf), subject);		
	}

	@Override
	public Optional<IRI> getWasRevisionOf() {
		return getIri(rdf.createIRI(PROV.wasRevisionOf));
	}

	@Override
	public Optional<IRI> getSameAs() {
		return getIri(rdf.createIRI(OWL.sameAs));
	}

	@Override
	public void setSameAs(IRI subject) {
		set(rdf.createIRI(OWL.sameAs), subject);		
	}

	@Override
	public void addType(IRI type) {
		add(rdf.createIRI(Rdf.type), type);
	}

	
	@Override
	public Set<IRI> getTypes() {
		return getIris(rdf.createIRI(Rdf.type));
	}

//	@Override
//	public Optional<ActivityPubObject> getResource(IRI property, boolean removeItself) {
//		// TODO Auto-generated method stub
//		return Optional.empty();
//	}
//
//	@Override
//	public Set<ActivityPubObject> getResources(IRI property, boolean removeItself) {
//		// TODO Auto-generated method stub
//		return null;
//	}

	protected void set(IRI predicate, Set<IRI> terms) {
		graph.remove(subject, predicate, null);
		terms.forEach(val->add(predicate, val));
	}

	protected void set(IRI predicate, RDFTerm term) {
		graph.remove(subject, predicate, null);
		if(term != null) {
			add(predicate, term);
		}
	}
	
	protected void add(IRI predicate, RDFTerm term) {
		graph.add(subject, predicate, term);	
	}	

	protected Literal fromInstant(Instant instant) {
		ZonedDateTime zdt = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
		String isiIOnstantString = DateTimeFormatter.ISO_INSTANT.format(zdt);		
        return rdf.createLiteral(isiIOnstantString, rdf.createIRI("http://www.w3.org/2001/XMLSchema#dateTime"));
	}
	
	protected Instant toInstant(Literal dateTimeLiteral) {
		if(!rdf.createIRI("http://www.w3.org/2001/XMLSchema#dateTime").equals(dateTimeLiteral.getDatatype())) {
			throw new IllegalStateException("unsupported Datatype: " + dateTimeLiteral.getDatatype());
		}
		String dateString = dateTimeLiteral.getLexicalForm();
		return Instant.parse(dateString);
	}

	protected void remove(IRI subject) {
		graph.remove(subject, null, null);
	}

	protected void remove(IRI subject, IRI predicate) {
		graph.remove(subject, predicate, null);
	}

	protected void removeAll(Collection<IRI> subjects) {
		subjects.forEach(this::remove);
	}

	@Override
	public Graph asGraph() {
		return graph;
	}

	@Override
	public String toString() {
		return graphToStringConverter.convert(RdfFormat.TURTLE, graph);
	}
	
	@Override
	public String toString(RdfFormat format) {
		return graphToStringConverter.convert(format, graph);
	}

	@Override
	public String toString(RdfFormat format, boolean resolveIris) {
		return graphToStringConverter.convert(format, graph, resolveIris);
	}

	protected void add(Graph graphParam) {
		graphParam.stream()
			.forEach(graph::add);
	}

	@Override
	public ActivityPubObjectConvertable asConvertable() {
		return new ActivityPubObjectConvertableDefault(rdf, graphToStringConverter, this, subjectProvider, vocabularies, instanceProperties);
	}
	
	protected Optional<String> getStringLiteral(String predicate) {
		return graph.stream(subject, rdf.createIRI(predicate), null).findFirst().map(Triple::getObject).map(RDFTerm::toString);
	}

	@Override
	public boolean isActivity() {
		return false;
	}
	
	protected IRI replaceSubject(Actor actor) {
		IRI newActivitySubject = subjectProvider.provide(actor);
		replaceSubject(getSubject(), newActivitySubject);
		this.subject = newActivitySubject;
		return newActivitySubject;
	}
	
	protected void replaceSubject(IRI subject, IRI newSubject) {		
		Set<Triple> withSubject = graph.stream(subject, null, null)
				.collect(Collectors.toSet());
		Set<Triple> withReferences = graph.stream(null, null, subject)
				.collect(Collectors.toSet());

		withSubject.forEach(graph::remove);
		withReferences.forEach(graph::remove);
		
		withSubject.forEach(t->graph.add(newSubject, t.getPredicate(), t.getObject()));
		withReferences.forEach(t->graph.add(t.getSubject(), t.getPredicate(), newSubject));		
	}

	public void partialUpdate(ActivityPubObject newOne) {
		Map<IRI, Set<RDFTerm>> props = new HashMap<>();
		graphAsStream(newOne).forEach(triple->{
			if(!props.containsKey(triple.getPredicate())) {
				props.put(triple.getPredicate(), new HashSet<>());
			}
			Set<RDFTerm> terms = props.get(triple.getPredicate());			
			terms.add(triple.getObject());
		});
		props.keySet().forEach(predicate->{
			this.remove(newOne.getSubject(), predicate);
		});
		
		props.forEach((k,v)->{
			v.stream().forEach(term->this.add(k, term));
		});
//		System.out.println("this: " + this);
	}
	
	private Stream<? extends Triple> graphAsStream(ActivityPubObject newOne) {
		return  newOne.asGraph()
				.stream(newOne.getSubject(), null, null)
				.collect(Collectors.toSet()) // because internal the use parallelStream !
				.stream();
	}
}