1 /** 2 This is the `processPileUps` command of `dentist`. 3 4 Copyright: © 2018 Arne Ludwig <arne.ludwig@posteo.de> 5 License: Subject to the terms of the MIT license, as written in the 6 included LICENSE file. 7 Authors: Arne Ludwig <arne.ludwig@posteo.de> 8 */ 9 module dentist.commands.processPileUps; 10 11 import dentist.commandline : DentistCommand, OptionsFor; 12 import dentist.commands.processPileUps.cropper : CropOptions, cropPileUp; 13 import dentist.common : 14 ReferenceInterval, 15 ReferencePoint, 16 ReferenceRegion; 17 import dentist.common.alignments : 18 AlignmentChain, 19 getAlignmentRefs, 20 getType, 21 isExtension, 22 makeJoin, 23 PileUp, 24 ReadAlignment; 25 import dentist.common.binio : 26 CompressedSequence, 27 InsertionDb, 28 PileUpDb; 29 import dentist.common.insertions : 30 Insertion, 31 InsertionInfo, 32 SpliceSite; 33 import dentist.common.scaffold : ContigNode, getDefaultJoin; 34 import dentist.util.log; 35 import dentist.dazzler : 36 attachTracePoints, 37 getConsensus, 38 getFastaSequences, 39 readMask; 40 import std.algorithm : 41 equal, 42 filter, 43 find, 44 joiner, 45 map, 46 maxElement, 47 merge, 48 sort, 49 uniq; 50 import std.array : array; 51 import std.conv : to; 52 import std.exception : enforce; 53 import std.parallelism : parallel, taskPool; 54 import std.range : enumerate, evenChunks, only, zip; 55 import std.typecons : Yes; 56 import vibe.data.json : toJson = serializeToJson; 57 58 59 /// Options for the `processPileUps` command. 60 alias Options = OptionsFor!(DentistCommand.processPileUps); 61 62 /// Execute the `processPileUps` command with `options`. 63 void execute(Options)(in Options options) 64 { 65 auto processor = new PileUpProcessor(options); 66 67 processor.run(); 68 } 69 70 /// This class comprises the `processPileUps` step of the `dentist` algorithm 71 class PileUpProcessor 72 { 73 protected const Options options; 74 protected PileUp[] pileUps; 75 ReferenceRegion repeatMask; 76 Insertion[] insertions; 77 78 this(in ref Options options) 79 { 80 this.options = options; 81 this.insertions.length = options.pileUpBatchSize; 82 } 83 84 void run() 85 { 86 mixin(traceExecution); 87 88 readPileUps(); 89 readRepeatMask(); 90 91 foreach (i, pileUp; parallel(pileUps)) 92 { 93 fetchTracePoints(pileUp); 94 95 processPileUp(pileUp, &insertions[i]); 96 } 97 98 insertions.sort(); 99 dropEmptyInsertions(); 100 debug logJsonDebug("insertions", insertions 101 .map!(ins => [ 102 "start": [ 103 "contigId": ins.start.contigId.toJson, 104 "contigPart": ins.start.contigPart.to!string.toJson, 105 ], 106 "end": [ 107 "contigId": ins.end.contigId.toJson, 108 "contigPart": ins.end.contigPart.to!string.toJson, 109 ], 110 "payload": [ 111 "sequence": ins.payload.sequence.to!string.toJson, 112 "contigLength": ins.payload.contigLength.toJson, 113 "spliceSites": ins.payload.spliceSites.toJson, 114 ], 115 ]) 116 .array 117 .toJson); 118 119 writeInsertions(); 120 } 121 122 protected void readPileUps() 123 { 124 mixin(traceExecution); 125 126 auto pileUpDb = PileUpDb.parse(options.pileUpsFile); 127 pileUps = pileUpDb[options.pileUpBatch[0] .. options.pileUpBatch[1]]; 128 } 129 130 protected void readRepeatMask() 131 { 132 mixin(traceExecution); 133 134 repeatMask = ReferenceRegion(readMask!ReferenceInterval( 135 options.refDb, 136 options.repeatMask, 137 options.workdir, 138 )); 139 140 foreach (mask; options.additionalMasks) 141 repeatMask |= ReferenceRegion(readMask!ReferenceInterval( 142 options.refDb, 143 mask, 144 options.workdir, 145 )); 146 } 147 148 protected ref PileUp fetchTracePoints(ref PileUp pileUp) 149 { 150 mixin(traceExecution); 151 152 auto allAlignmentChains = pileUp.getAlignmentRefs(); 153 allAlignmentChains.sort!"*a < *b"; 154 allAlignmentChains.attachTracePoints( 155 options.refDb, 156 options.readsDb, 157 options.readsAlignmentFile, 158 options.tracePointDistance, 159 options.workdir 160 ); 161 162 return pileUp; 163 } 164 165 protected void processPileUp(PileUp pileUp, Insertion* resultInsertion) const 166 { 167 mixin(traceExecution); 168 169 try 170 { 171 if (pileUp.length < options.minReadsPerPileUp) 172 { 173 logJsonInfo( 174 "info", "skipping pile up due to `minReadsPerPileUp`", 175 "reason", "minReadsPerPileUp", 176 "pileUp", [ 177 "type": pileUp.getType.to!string.toJson, 178 "length": pileUp.length.toJson, 179 "contigIds": pileUp 180 .map!(ra => ra[].map!"a.contigA.id".array) 181 .joiner 182 .array 183 .sort 184 .uniq 185 .array 186 .toJson, 187 ], 188 ); 189 190 return; 191 } 192 193 auto croppingResult = cropPileUp(pileUp, repeatMask, CropOptions( 194 options.readsDb, 195 options.tracePointDistance, 196 options.workdir, 197 )); 198 auto referenceReadIdx = bestReadAlignmentIndex(pileUp, croppingResult.referencePositions); 199 auto referenceRead = pileUp[referenceReadIdx]; 200 assert(referenceRead.length == croppingResult.referencePositions.length); 201 202 auto consensusDb = getConsensus( 203 croppingResult.db, 204 referenceReadIdx + 1, 205 options.consensusOptions 206 ); 207 auto insertSequences = getFastaSequences(consensusDb, only(1), options.workdir); 208 enforce!Exception(!insertSequences.empty, "consensus could not be computed"); 209 auto insertSequence = insertSequences.front; 210 211 if ( 212 pileUp.isExtension && 213 shouldSkipShortExtension(croppingResult, insertSequence, referenceRead) 214 ) 215 { 216 logJsonInfo( 217 "info", "skipping pile up due to `minExtensionLength`", 218 "reason", "minExtensionLength", 219 "pileUp", [ 220 "type": pileUp.getType.to!string.toJson, 221 "length": pileUp.length.toJson, 222 "contigIds": pileUp 223 .map!(ra => ra[].map!"a.contigA.id".array) 224 .joiner 225 .array 226 .sort 227 .uniq 228 .array 229 .toJson, 230 ], 231 ); 232 233 return; 234 } 235 236 auto compressedSequence = CompressedSequence.from(insertSequence); 237 *resultInsertion = makeInsertions( 238 referenceRead, 239 compressedSequence, 240 croppingResult.referencePositions, 241 ); 242 } 243 catch(Exception e) 244 { 245 logJsonWarn( 246 "info", "skipping pile up due to errors", 247 "reason", "error", 248 "error", e.message().to!string, 249 "pileUp", [ 250 "type": pileUp.getType.to!string.toJson, 251 "length": pileUp.length.toJson, 252 "contigIds": pileUp 253 .map!(ra => ra[].map!"a.contigA.id".array) 254 .joiner 255 .array 256 .sort 257 .uniq 258 .array 259 .toJson, 260 ], 261 ); 262 } 263 } 264 265 protected void dropEmptyInsertions() 266 { 267 insertions = insertions.find!(ins => ins.start.contigId != 0); 268 } 269 270 protected void writeInsertions() 271 { 272 mixin(traceExecution); 273 274 InsertionDb.write(options.insertionsFile, insertions); 275 } 276 277 protected size_t bestReadAlignmentIndex( 278 in PileUp pileUp, 279 in ReferencePoint[] referencePositions, 280 ) const pure 281 { 282 // NOTE pileUp is not modified but the read alignments need to be assignable. 283 return (cast(PileUp) pileUp) 284 .enumerate 285 .filter!(enumReadAlignment => 286 enumReadAlignment.value.length == referencePositions.length && 287 enumReadAlignment.value[].map!"a.contigA.id".equal(referencePositions.map!"a.contigId")) 288 .maxElement!(enumReadAlignment => enumReadAlignment.value.meanScore) 289 .index; 290 } 291 292 protected bool shouldSkipShortExtension(T)( 293 T croppingResult, 294 in string insertSequence, 295 in ReadAlignment referenceRead, 296 ) const 297 { 298 assert(croppingResult.referencePositions.length == 1); 299 300 auto refPos = croppingResult.referencePositions[0].value; 301 auto refLength = referenceRead[0].contigA.length; 302 ulong extensionLength = referenceRead.isFrontExtension 303 ? insertSequence.length.to!ulong - refPos.to!ulong 304 : insertSequence.length.to!ulong - (refLength - refPos).to!ulong; 305 306 return extensionLength < options.minExtensionLength; 307 } 308 309 protected static Insertion makeInsertions( 310 ReadAlignment referenceRead, 311 CompressedSequence compressedSequence, 312 ReferencePoint[] referencePositions, 313 ) 314 { 315 auto insertion = makeJoin!Insertion(referenceRead); 316 insertion.payload = InsertionInfo( 317 compressedSequence, 318 0, 319 zip(referencePositions, referenceRead[].map!"a.seed", referenceRead[].map!"a.flags") 320 .map!(spliceSite => SpliceSite(spliceSite.expand)) 321 .array, 322 ); 323 324 return insertion; 325 } 326 }