1 /**
2     This is the `processPileUps` command of `dentist`.
3 
4     Copyright: © 2018 Arne Ludwig <arne.ludwig@posteo.de>
5     License: Subject to the terms of the MIT license, as written in the
6              included LICENSE file.
7     Authors: Arne Ludwig <arne.ludwig@posteo.de>
8 */
9 module dentist.commands.processPileUps;
10 
11 import dentist.commandline : DentistCommand, OptionsFor;
12 import dentist.commands.processPileUps.cropper : CropOptions, cropPileUp;
13 import dentist.common :
14     ReferenceInterval,
15     ReferencePoint,
16     ReferenceRegion;
17 import dentist.common.alignments :
18     AlignmentChain,
19     getAlignmentRefs,
20     getType,
21     isExtension,
22     makeJoin,
23     PileUp,
24     ReadAlignment;
25 import dentist.common.binio :
26     CompressedSequence,
27     InsertionDb,
28     PileUpDb;
29 import dentist.common.insertions :
30     Insertion,
31     InsertionInfo,
32     SpliceSite;
33 import dentist.common.scaffold : ContigNode, getDefaultJoin;
34 import dentist.util.log;
35 import dentist.dazzler :
36     attachTracePoints,
37     getConsensus,
38     getFastaSequences,
39     readMask;
40 import std.algorithm :
41     equal,
42     filter,
43     find,
44     joiner,
45     map,
46     maxElement,
47     merge,
48     sort,
49     uniq;
50 import std.array : array;
51 import std.conv : to;
52 import std.exception : enforce;
53 import std.parallelism : parallel, taskPool;
54 import std.range : enumerate, evenChunks, only, zip;
55 import std.typecons : Yes;
56 import vibe.data.json : toJson = serializeToJson;
57 
58 
59 /// Options for the `processPileUps` command.
60 alias Options = OptionsFor!(DentistCommand.processPileUps);
61 
62 /// Execute the `processPileUps` command with `options`.
63 void execute(Options)(in Options options)
64 {
65     auto processor = new PileUpProcessor(options);
66 
67     processor.run();
68 }
69 
70 /// This class comprises the `processPileUps` step of the `dentist` algorithm
71 class PileUpProcessor
72 {
73     protected const Options options;
74     protected PileUp[] pileUps;
75     ReferenceRegion repeatMask;
76     Insertion[] insertions;
77 
78     this(in ref Options options)
79     {
80         this.options = options;
81         this.insertions.length = options.pileUpBatchSize;
82     }
83 
84     void run()
85     {
86         mixin(traceExecution);
87 
88         readPileUps();
89         readRepeatMask();
90 
91         foreach (i, pileUp; parallel(pileUps))
92         {
93             fetchTracePoints(pileUp);
94 
95             processPileUp(pileUp, &insertions[i]);
96         }
97 
98         insertions.sort();
99         dropEmptyInsertions();
100         debug logJsonDebug("insertions", insertions
101             .map!(ins => [
102                 "start": [
103                     "contigId": ins.start.contigId.toJson,
104                     "contigPart": ins.start.contigPart.to!string.toJson,
105                 ],
106                 "end": [
107                     "contigId": ins.end.contigId.toJson,
108                     "contigPart": ins.end.contigPart.to!string.toJson,
109                 ],
110                 "payload": [
111                     "sequence": ins.payload.sequence.to!string.toJson,
112                     "contigLength": ins.payload.contigLength.toJson,
113                     "spliceSites": ins.payload.spliceSites.toJson,
114                 ],
115             ])
116             .array
117             .toJson);
118 
119         writeInsertions();
120     }
121 
122     protected void readPileUps()
123     {
124         mixin(traceExecution);
125 
126         auto pileUpDb = PileUpDb.parse(options.pileUpsFile);
127         pileUps = pileUpDb[options.pileUpBatch[0] .. options.pileUpBatch[1]];
128     }
129 
130     protected void readRepeatMask()
131     {
132         mixin(traceExecution);
133 
134         repeatMask = ReferenceRegion(readMask!ReferenceInterval(
135             options.refDb,
136             options.repeatMask,
137             options.workdir,
138         ));
139 
140         foreach (mask; options.additionalMasks)
141             repeatMask |= ReferenceRegion(readMask!ReferenceInterval(
142                 options.refDb,
143                 mask,
144                 options.workdir,
145             ));
146     }
147 
148     protected ref PileUp fetchTracePoints(ref PileUp pileUp)
149     {
150         mixin(traceExecution);
151 
152         auto allAlignmentChains = pileUp.getAlignmentRefs();
153         allAlignmentChains.sort!"*a < *b";
154         allAlignmentChains.attachTracePoints(
155             options.refDb,
156             options.readsDb,
157             options.readsAlignmentFile,
158             options.tracePointDistance,
159             options.workdir
160         );
161 
162         return pileUp;
163     }
164 
165     protected void processPileUp(PileUp pileUp, Insertion* resultInsertion) const
166     {
167         mixin(traceExecution);
168 
169         try
170         {
171             if (pileUp.length < options.minReadsPerPileUp)
172             {
173                 logJsonInfo(
174                     "info", "skipping pile up due to `minReadsPerPileUp`",
175                     "reason", "minReadsPerPileUp",
176                     "pileUp", [
177                         "type": pileUp.getType.to!string.toJson,
178                         "length": pileUp.length.toJson,
179                         "contigIds": pileUp
180                             .map!(ra => ra[].map!"a.contigA.id".array)
181                             .joiner
182                             .array
183                             .sort
184                             .uniq
185                             .array
186                             .toJson,
187                     ],
188                 );
189 
190                 return;
191             }
192 
193             auto croppingResult = cropPileUp(pileUp, repeatMask, CropOptions(
194                 options.readsDb,
195                 options.tracePointDistance,
196                 options.workdir,
197             ));
198             auto referenceReadIdx = bestReadAlignmentIndex(pileUp, croppingResult.referencePositions);
199             auto referenceRead = pileUp[referenceReadIdx];
200             assert(referenceRead.length == croppingResult.referencePositions.length);
201 
202             auto consensusDb = getConsensus(
203                 croppingResult.db,
204                 referenceReadIdx + 1,
205                 options.consensusOptions
206             );
207             auto insertSequences = getFastaSequences(consensusDb, only(1), options.workdir);
208             enforce!Exception(!insertSequences.empty, "consensus could not be computed");
209             auto insertSequence = insertSequences.front;
210 
211             if (
212                 pileUp.isExtension &&
213                 shouldSkipShortExtension(croppingResult, insertSequence, referenceRead)
214             )
215             {
216                 logJsonInfo(
217                     "info", "skipping pile up due to `minExtensionLength`",
218                     "reason", "minExtensionLength",
219                     "pileUp", [
220                         "type": pileUp.getType.to!string.toJson,
221                         "length": pileUp.length.toJson,
222                         "contigIds": pileUp
223                             .map!(ra => ra[].map!"a.contigA.id".array)
224                             .joiner
225                             .array
226                             .sort
227                             .uniq
228                             .array
229                             .toJson,
230                     ],
231                 );
232 
233                 return;
234             }
235 
236             auto compressedSequence = CompressedSequence.from(insertSequence);
237             *resultInsertion = makeInsertions(
238                 referenceRead,
239                 compressedSequence,
240                 croppingResult.referencePositions,
241             );
242         }
243         catch(Exception e)
244         {
245             logJsonWarn(
246                 "info", "skipping pile up due to errors",
247                 "reason", "error",
248                 "error", e.message().to!string,
249                 "pileUp", [
250                     "type": pileUp.getType.to!string.toJson,
251                     "length": pileUp.length.toJson,
252                     "contigIds": pileUp
253                         .map!(ra => ra[].map!"a.contigA.id".array)
254                         .joiner
255                         .array
256                         .sort
257                         .uniq
258                         .array
259                         .toJson,
260                 ],
261             );
262         }
263     }
264 
265     protected void dropEmptyInsertions()
266     {
267         insertions = insertions.find!(ins => ins.start.contigId != 0);
268     }
269 
270     protected void writeInsertions()
271     {
272         mixin(traceExecution);
273 
274         InsertionDb.write(options.insertionsFile, insertions);
275     }
276 
277     protected size_t bestReadAlignmentIndex(
278         in PileUp pileUp,
279         in ReferencePoint[] referencePositions,
280     ) const pure
281     {
282         // NOTE pileUp is not modified but the read alignments need to be assignable.
283         return (cast(PileUp) pileUp)
284             .enumerate
285             .filter!(enumReadAlignment =>
286                 enumReadAlignment.value.length == referencePositions.length &&
287                 enumReadAlignment.value[].map!"a.contigA.id".equal(referencePositions.map!"a.contigId"))
288             .maxElement!(enumReadAlignment => enumReadAlignment.value.meanScore)
289             .index;
290     }
291 
292     protected bool shouldSkipShortExtension(T)(
293         T croppingResult,
294         in string insertSequence,
295         in ReadAlignment referenceRead,
296     ) const
297     {
298         assert(croppingResult.referencePositions.length == 1);
299 
300         auto refPos = croppingResult.referencePositions[0].value;
301         auto refLength = referenceRead[0].contigA.length;
302         ulong extensionLength = referenceRead.isFrontExtension
303             ? insertSequence.length.to!ulong - refPos.to!ulong
304             : insertSequence.length.to!ulong - (refLength - refPos).to!ulong;
305 
306         return extensionLength < options.minExtensionLength;
307     }
308 
309     protected static Insertion makeInsertions(
310         ReadAlignment referenceRead,
311         CompressedSequence compressedSequence,
312         ReferencePoint[] referencePositions,
313     )
314     {
315         auto insertion = makeJoin!Insertion(referenceRead);
316         insertion.payload = InsertionInfo(
317             compressedSequence,
318             0,
319             zip(referencePositions, referenceRead[].map!"a.seed", referenceRead[].map!"a.flags")
320                 .map!(spliceSite => SpliceSite(spliceSite.expand))
321                 .array,
322         );
323 
324         return insertion;
325     }
326 }