StoreDefault.java

package org.linkedopenactors.rdfpub.store.rdf4j;

import static org.eclipse.rdf4j.model.util.Values.iri;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.apache.commons.rdf.api.BlankNodeOrIRI;
import org.apache.commons.rdf.api.Dataset;
import org.apache.commons.rdf.api.Graph;
import org.apache.commons.rdf.api.IRI;
import org.apache.commons.rdf.api.Quad;
import org.apache.commons.rdf.api.RDFTerm;
import org.apache.commons.rdf.api.Triple;
import org.apache.commons.rdfrdf4j.RDF4J;
import org.apache.commons.rdfrdf4j.RDF4JBlankNodeOrIRI;
import org.apache.commons.rdfrdf4j.RDF4JDataset;
import org.apache.commons.rdfrdf4j.RDF4JGraphLike;
import org.apache.commons.rdfrdf4j.RDF4JIRI;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.impl.TreeModel;
import org.eclipse.rdf4j.model.util.RDFContainers;
import org.eclipse.rdf4j.model.util.Values;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.linkedopenactors.rdfpub.domain.commonsrdf.RdfFormat;
import org.linkedopenactors.rdfpub.domain.commonsrdf.vocab.OWL;
import org.linkedopenactors.rdfpub.domain.commonsrdf.vocab.Rdf;
import org.linkedopenactors.rdfpub.store.Store;

import lombok.extern.slf4j.Slf4j;

@Slf4j
class StoreDefault implements Store {
	static final org.slf4j.Logger graphlog = org.slf4j.LoggerFactory.getLogger(StoreDefault.class.getName() + "_graphlog");
	static final org.slf4j.Logger graphlogStore = org.slf4j.LoggerFactory.getLogger(StoreDefault.class.getName() + "_graphlog_store");
	private Dataset dataset;
	private RDF4J rdf;
	private IRI owner;

	public StoreDefault(RDF4J rdf, RDF4JDataset dataset, IRI owner) {
		this.rdf = rdf;
		this.dataset = dataset;
		this.owner = owner;
	}

	@Override
	public void add(BlankNodeOrIRI graphName, Graph graph, String logMsg) {
		if(log.isDebugEnabled()) {
			log.debug("add(graphName:"+graphName+",graph)");
			logGraph(graphName, graph, logMsg, true);
		}
			
		// TODO problem with that loop!!! See PR https://github.com/apache/commons-rdf/pull/205
		graph.stream().forEach(triple->{
			dataset.add(graphName, triple.getSubject(), triple.getPredicate(), triple.getObject());
		});
	}

	@Override
	public void add(BlankNodeOrIRI graphName, Set<Triple> newTriples, String logMsg) {
		log.debug("add(graphName:"+graphName+",graph)");
		if(log.isDebugEnabled()) {
			log.debug("add(graphName:"+graphName+",graph)");
			logGraph(graphName, newTriples, logMsg, true);
			}
		// TODO problem with that loop!!! See PR https://github.com/apache/commons-rdf/pull/205
		newTriples.stream().forEach(triple->{
			dataset.add(graphName, triple.getSubject(), triple.getPredicate(), triple.getObject());
		});
	}

	@Override
	public void remove(BlankNodeOrIRI graphName, Set<BlankNodeOrIRI> subjects, String logMsg) {
		subjects.stream().forEach(subject->{
			dataset.remove(Optional.of(graphName), subject, null, null);
		});
	}

	/**
	 * Info. the managment of the internal revisons while saving is done in {@link AbstractHandler}. 
	 */
	@Override
	public Optional<Graph> find(BlankNodeOrIRI graphName, BlankNodeOrIRI subject) {
		return find(graphName, subject, 0);
	}
	
	@Override
	public Optional<Graph> find(BlankNodeOrIRI graphName, BlankNodeOrIRI subject, int deep) {
		if(deep>1) {
			throw new UnsupportedOperationException("deep is currently only possible for 1.");
		}
		log.debug("find("+graphName+", "+subject+", "+deep+")");
		Optional<Graph> graphOpt = find(graphName, subject, null, null)
				.flatMap(graph->checkIfItsABag(graphName, subject, graph));
		if(deep==1) {
			
			StackTraceElement[] stackTrace = new RuntimeException().getStackTrace();
			for (int i = 0; i < 5; i++) {				
				log.debug("comming from: " + stackTrace[i].getClassName() + "#" + stackTrace[i].getMethodName() + "("+stackTrace[i].getLineNumber()+")");
			}
			
			graphOpt.ifPresent(g->logGraph(graphName, g, "before resolve references"));
			graphOpt = graphOpt.map(graph->resolveReferences(graphName, graph));
			graphOpt.ifPresent(g->logGraph(graphName, g, "after resolve references"));
		}			
		return graphOpt;
	}

	private Graph resolveReferences(BlankNodeOrIRI graphName, Graph graph) {
		log.debug("resolveReferences("+graphName+", graph)");
		
		
//		AtomicInteger i = new AtomicInteger(0);
		Map<String, RDFTerm> m = graph.stream()
				.collect(Collectors.toMap(e->UUID.randomUUID().toString() + "|" + e.getPredicate().toString(), Triple::getObject));
		
		
		Map<String, IRI> filtered = m.entrySet().stream()
	                        .filter(entry->IRI.class.isInstance(entry.getValue()))
	                        .filter(entry->isInternal(entry.getValue().toString()))
	                        .collect(Collectors.toMap(
	                                e -> e.getKey(),
	                                e -> IRI.class.cast(e.getValue())
	                            ));

//		filtered.forEach((k,v)->System.out.println("k: " + k + ", v: " + v));
		
//		Set<IRI> refs = graph.stream()
//			.filter(t-> IRI.class.isInstance(t.getObject()))
//			.map(Triple::getObject)
//			.map(IRI.class::cast)
//			.filter(s->isInternal(s.toString()))
//			.collect(Collectors.toSet());
		
		
//		log.debug("references: " + refs);
//		refs.forEach(r->{
//			Optional<Graph> gf = find(graphName, r, null, null)
//					.flatMap(gr->checkIfItsABag(graphName, r, gr));
//			log.debug("resolve reference: " + r);
//			log.debug("resolved reference: " + gf);
//			gf.ifPresent(gr->{
//				gr.stream().forEach(graph::add);				
//			});
//		});
		
		filtered.forEach((k, v) -> {
			log.debug("resolving: " + k + " -> " + v );
			Optional<Graph> gf = find(graphName, v, null, null).flatMap(gr -> checkIfItsABag(graphName, v, gr));
//			log.debug("resolve reference: " + v);
//			log.debug("resolved reference: " + gf);
			gf.ifPresent(gr -> {
				gr.stream().forEach(graph::add);
			});
		});		
		return graph;
	}
	
	private boolean isInternal(String iri) {
		return !iri.startsWith("http");
	}
	
	private Optional<Graph> checkIfItsABag(BlankNodeOrIRI graphName, BlankNodeOrIRI subject, Graph graph) {
		
		if(isBag(subject, graph)) {
			log.debug("checkIfItsABag: YES");
//			logGraph(graphName, graph, "checkIfItsABag");
			Optional<Graph> latest = findLatestRevision(graphName, (IRI)subject, true);				
			return latest;
		} else {
			log.debug("checkIfItsABag: NO");
//			logGraph(graphName, graph, "checkIfItsABag");
		}
		return Optional.of(graph);
	}

	@Override
	public Optional<Graph> find(BlankNodeOrIRI graphName, BlankNodeOrIRI subject, IRI predicate, RDFTerm object) {
		Graph graph = rdf.createGraph();
		try(Stream<? extends Quad> stream = dataset.stream(Optional.ofNullable(graphName), subject, predicate, object)) {
			stream.forEach(quad->graph.add(quad.getSubject(), quad.getPredicate(), quad.getObject()));
		}
		if(graph.size() < 1) {
			trace(graphName);
			return Optional.empty();
		} else {
			logGraph(graphName, graph, "find ["+subject+"]");
			return Optional.of(graph);
		}
	}

	private void trace(BlankNodeOrIRI graphName) {
//		try(Stream<? extends Quad> stream = dataset.stream(Optional.ofNullable(graphName), null, null, null)) {
//			stream.forEach(quad->System.out.println("TRACE:: "+quad.getSubject() +" - " + quad.getPredicate() +" - " + quad.getObject()));
//		}
	}
	
	@Override
	public Map<BlankNodeOrIRI, Graph> findAll(BlankNodeOrIRI graphName, List<BlankNodeOrIRI> subjects) {
		Map<BlankNodeOrIRI, Graph> result = new HashMap<>();
		subjects.forEach(sub->{
			find(graphName, sub)
				.ifPresent(graph->result.put(sub, graph));
		});			
		return result;
	}

	private Optional<Graph> findLatestRevision(BlankNodeOrIRI graphName, IRI collectionSubject, boolean useCollectionSubject) {
		Optional<Graph> result = Optional.empty();
		List<IRI> items = getCollection(graphName, collectionSubject);
		if(!items.isEmpty()) {
			IRI revisionSubject = items.get(items.size()-1);
			result = find(graphName, revisionSubject);
			if(useCollectionSubject) {
				result = replaceSubject(result.orElse(null), revisionSubject, collectionSubject);
				result.ifPresent(r->r.add(collectionSubject, rdf.createIRI(OWL.sameAs), revisionSubject));
			}
			result.ifPresent(g->logGraph(graphName, g, "getLatest"));
		}
		return result;
	}

	@Override
	public Optional<IRI> findLatestRevisionSubject(BlankNodeOrIRI graphName, IRI collectionSubject) {
		Optional<IRI> result = Optional.empty();
		List<IRI> items = getCollection(graphName, collectionSubject);
		if(!items.isEmpty()) {
			result = Optional.ofNullable(items.get(items.size()-1));
		}
		return result;
	}

	@Override
	public Optional<Graph> find(BlankNodeOrIRI graphName) {
		// ATTENTION
		// the dataset is a org.apache.commons.rdfrdf4j.impl.RepositoryDatasetImpl and returns a org.apache.commons.rdfrdf4j.impl.RepositoryGraphImpl
		// which offers open connection streams, so the clinet has to take care of closing that stream.
		// I think that's too dangerous, so it's better to have a decoupled copy
		Optional<Graph> found = dataset.getGraph(graphName);
		if(found.isPresent()) {
			Graph copy = rdf.createGraph();
			try(Stream<? extends Triple> stream = found.get().stream()) {
				stream.forEach(triple->copy.add(triple));
				return Optional.of(copy);
			}
		} else {
			return Optional.empty();
		}
	}

	/**
	 * Check's, if the passed graph is a {@link Rdf#Bag}.
	 * @param subject
	 * @param graph
	 * @return True, if the passed graph's type is {@link Rdf#Bag}.
	 */
	private boolean isBag(BlankNodeOrIRI subject, Graph graph) {
		return graph.stream(subject, rdf.createIRI(Rdf.type), (RDFTerm)null)
				.map(Triple::getObject)
				.anyMatch(term->{
					RDF4JIRI bagTerm = rdf.createIRI(Rdf.Bag);
					return term.equals(bagTerm);
				});
	}
	
	/**
	 * The latest Object is known to the user under a pointer name. This pointer
	 * points internally to a collection of revisions of a graph, so we have to
	 * replace the internal subject with the pointer.
	 * 
	 * @param graph
	 * @param revisionSubject
	 * @param pointerSubject
	 * @return The passed graph with the pointerSubject instead of the intrenal subject.
	 */
	private Optional<Graph> replaceSubject(Graph graph, BlankNodeOrIRI revisionSubject, BlankNodeOrIRI pointerSubject) {
		return Optional.ofNullable(graph).map(g->{
			Graph result = rdf.createGraph();
			g.stream(revisionSubject, null, null)
				.forEach(t->result.add(pointerSubject, t.getPredicate(), t.getObject()));
			return result;
		});
	}
		
	public Set<BlankNodeOrIRI> getGraphNames() {
		try(Stream<BlankNodeOrIRI> graphNames = dataset.getGraphNames()) {
			return graphNames.collect(Collectors.toSet());
		}
	}

	@Override
	public void saveCollection(BlankNodeOrIRI graphName, IRI collectionSubject, List<IRI> items, String logMsg) {
		
		log.debug("saveCollection: store: "+owner+" + graphName: " + graphName + "; collectionSubject: " + collectionSubject);
		
		if(exists(graphName, collectionSubject)) {
			throw new IllegalStateException("graph '%s' already exists!".formatted(graphName));
		}
		
		List<org.eclipse.rdf4j.model.IRI> rdf4jItems = items.stream()
				.map(i->Values.iri(i.toString()))
				.collect(Collectors.toList());
		
		org.eclipse.rdf4j.model.IRI asValue = ((RDF4JIRI)collectionSubject).asValue();
		Model bag = RDFContainers.toRDF(iri("http://www.w3.org/1999/02/22-rdf-syntax-ns#Bag"), rdf4jItems, 
				asValue, 
				new TreeModel());
		
		// TODO use new method -> https://github.com/apache/commons-rdf/pull/205
		Graph bagGraph = new RDF4J().asGraph(bag);
		logGraph(graphName, bagGraph, logMsg, true);
		bagGraph.stream().forEach(t->dataset.add(graphName, t.getSubject(), t.getPredicate(), t.getObject()));
	}

	/**
	 * @param graphName
	 * @param subject
	 * @return True, if the passed subjects points to a none empty graph.
	 */
	private boolean exists(BlankNodeOrIRI graphName, BlankNodeOrIRI subject) {
		try(Graph graph = getGraph(graphName);){
			return graph.contains(subject, null, null);
		} catch (Exception e) {
			throw new IllegalStateException("unable to get graph", e);
		}
	}
	
	@Override
	public List<IRI> getCollection(BlankNodeOrIRI graphName, BlankNodeOrIRI collectionSubject) {
		log.debug("getCollection(graphName:"+graphName+",collection:"+collectionSubject+")"); 
			try(Stream<? extends Triple> stream = getGraph(graphName).stream(collectionSubject, null, null)){
			List<Triple> triples = stream.collect(Collectors.toList());	
			Graph sub = rdf.createGraph();
			triples.forEach(sub::add);
			@SuppressWarnings({ "rawtypes", "unchecked" })
			Optional<Model> modelOpt = ((RDF4JGraphLike)sub).asModel();
			
			log.debug("getCollection(...) model: " + new GraphToStringConverterDefault(null).convert(RdfFormat.TURTLE, modelOpt.orElse(null)));
			
			Optional<List<Value>> valueListOpt = modelOpt.map(model->RDFContainers.toValues(org.eclipse.rdf4j.model.vocabulary.RDF.BAG, model, 
					((RDF4JBlankNodeOrIRI)collectionSubject).asValue(), 
					new ArrayList<Value>()));
			return valueListOpt.map(this::toIriList).orElse(new ArrayList<>());
		} catch (Exception e) {
			throw new IllegalStateException("unable to get resource", e);
		}
	}

	@Override
	public Set<IRI> findCollection(BlankNodeOrIRI graphName, IRI memberParam) {
		RDF4JIRI memberPredicate = rdf.createIRI("http://www.w3.org/2000/01/rdf-schema#member");
		try(Stream<? extends Triple> stream = getGraph(graphName).stream(null, memberPredicate, memberParam)){
			return stream
					.map(Triple::getSubject)
					.filter(IRI.class::isInstance)
					.map(IRI.class::cast)
					.collect(Collectors.toSet());
		}
	}

	/**
	 * @param graphName
	 * @return The graph to the passed graphName.
	 */
	private Graph getGraph(BlankNodeOrIRI graphName) {
		return dataset.getGraph(graphName).orElseThrow(()->new IllegalStateException("no graph with name '%s'".formatted(graphName)));
	}
	
	/**
	 * @param values
	 * @return The passed {@link Value}'s converted to {@link org.eclipse.rdf4j.model.IRI}'s. 
	 */
	private List<IRI> toIriList(List<Value> values)  {
		return values.stream().map(v->rdf.createIRI(v.toString())).collect(Collectors.toList());
	}

	@Override
	public void addItem(BlankNodeOrIRI graphName, IRI collectionSubject, IRI item, String logMsg) {
		
		if(!this.owner.equals(graphName)) {
			throw new IllegalStateException(owner + " != " + graphName);
		}
		
		if(!exists(graphName, collectionSubject)) {
			saveCollection(graphName, collectionSubject, List.of(item), logMsg);
		} else {
			if(log.isDebugEnabled()) {
				log.debug("addItem (store: "+owner+" + graphName:"+graphName+", collection:"+collectionSubject+", item:"+item+")");
			}
			long size = size(graphName, collectionSubject);
			String propIriAsString = "http://www.w3.org/1999/02/22-rdf-syntax-ns#_%s".formatted(size+1);
	
			// TODO how can we add multiple triples/quads ? See https://github.com/apache/commons-rdf/pull/205
			dataset.add(graphName, collectionSubject, rdf.createIRI(propIriAsString), item);
			dataset.add(graphName, collectionSubject, rdf.createIRI("http://www.w3.org/2000/01/rdf-schema#member"), item);
		}
		if(graphlogStore.isDebugEnabled()) {
			graphlogStore.debug("items ("+collectionSubject+"): " + getCollection(graphName, collectionSubject));
		}	
	}
	
	
//	private void logGraph(BlankNodeOrIRI graphName, Set<Triple> triples, String msg) {
//		logGraph(graphName, triples, msg, false);
//	}
	
	/**
	 * Logs the passed graph to the log.
	 * @param graphName
	 * @param triples the graph
	 * @param msg
	 */
	private void logGraph(BlankNodeOrIRI graphName, Set<Triple> triples, String msg, boolean store) {
		Graph g = rdf.createGraph();
		triples.forEach(g::add);
		logGraph(graphName, g, msg, store);
	}

	
	
	private void logGraph(BlankNodeOrIRI graphName, Graph graph, String msg) {
		logGraph(graphName, graph, msg, false);
	}
	/**
	 * Logs the passed graph to the log.
	 * @param graphName
	 * @param graph
	 * @param msg
	 */
	private void logGraph(BlankNodeOrIRI graphName, Graph graph, String msg, boolean store) {
		if(store) {
			if(graphlogStore.isDebugEnabled()) {
				String str = "\n(graphlogStore)#####################################################################################\n";
				str += graphName + "->" + msg + "\n";
				graphlogStore.debug(str + new GraphToStringConverterDefault(null).convert(RdfFormat.TURTLE, graph));
			}
		}
		if(graphlog.isDebugEnabled()) {
			String str = "\n(graphlog)#####################################################################################\n";
			str += graphName + "->" + msg + "\n";
			graphlog.debug(str + new GraphToStringConverterDefault(null).convert(RdfFormat.TURTLE, graph));
		}
	}

//	/**
//	 * Logs the whole graph with the passed graphname to the log.
//	 * @param graphName
//	 * @param msg
//	 */
//	private void logExistingGraph(BlankNodeOrIRI graphName, String msg) {
//		if(graphlog.isDebugEnabled()) {
//			final String str = "\n((graphlog)#####################################################################################\n"
//					+ graphName + "->" + msg + "\n";
//			find(graphName).ifPresentOrElse(graph->{			
//				graphlog.debug(str + new GraphToStringConverterDefault(null).convert(RdfFormat.TURTLE, graph));
//			}, () -> graphlog.debug(msg + " empty"));
//		}
//	}
	
	/**
	 * Determines the number of triples contained in the collection. 
	 * @param graphName
	 * @param collectionSubject
	 * @return
	 */
	private long size(BlankNodeOrIRI graphName, BlankNodeOrIRI collectionSubject) {
		Repository repo = ((RDF4JDataset)dataset).asRepository().orElseThrow();
		try (RepositoryConnection conn = repo.getConnection()) {
			Iterable<Statement> statements = conn.getStatements(Values.iri(collectionSubject.toString()), Values.iri("http://www.w3.org/2000/01/rdf-schema#member"), null);
			return StreamSupport.stream(statements.spliterator(), false).count();
		}
	}
	
	@Override
	public List<IRI> subCollection(BlankNodeOrIRI graphName, BlankNodeOrIRI collectionSubject, int offset, int limit) {
		List<IRI> all = getWholeListWithMemoryFootprint(graphName, collectionSubject);
		int end = offset + limit;
		end = end > all.size() ? all.size() : end;  
		List<IRI> subList = all.subList(offset, end);
		all = null;
		return subList;	
	}
	
	/**
	 * This was an experiment, we have to check if it makes sence. Or if we have to
	 * move it to a statistics part.
	 * 
	 * @param graphName
	 * @param collectionSubject
	 * @return All items of the collection.
	 */
	private List<IRI> getWholeListWithMemoryFootprint(BlankNodeOrIRI graphName, BlankNodeOrIRI collectionSubject) {
		List<IRI> all;
		try {
			all = getCollection(graphName, collectionSubject);
			List<String> ss = all.stream().map(Object::toString).collect(Collectors.toList());
			ByteArrayOutputStream bos = new ByteArrayOutputStream();
		    ObjectOutputStream oos = new ObjectOutputStream(bos);
		    oos.writeObject(ss);
			log.debug("bos (kb): " + (bos.toByteArray().length/1024) + " - items: " + all.size());
		} catch (Exception e) {
			throw new IllegalStateException("oje", e);
		}
		return all;
	}
}