diff --git a/pipeline/alignments/align.py b/pipeline/alignments/align.py index 5b4b2f2d7..ce63ae5d9 100755 --- a/pipeline/alignments/align.py +++ b/pipeline/alignments/align.py @@ -44,6 +44,11 @@ class Tokenization(Enum): moses = "moses" +class AlignerType(Enum): + eflomal = "eflomal" + fast_align = "fast_align" + + def run( corpus_src: str, corpus_trg: str, @@ -51,6 +56,7 @@ def run( priors_input_path: Optional[str], priors_output_path: Optional[str], tokenization: Tokenization, + aligner_type: AlignerType, ): bin = os.environ["BIN"] src = os.environ["SRC"] @@ -74,10 +80,12 @@ def run( output_aln = output_path fwd_path, rev_path = align( + bin=bin, corpus_src=tokenized_src, corpus_trg=tokenized_trg, priors_input_path=priors_input_path, tmp_dir=tmp_dir, + aligner_type=aligner_type, ) symmetrize(bin=bin, fwd_path=fwd_path, rev_path=rev_path, output_path=output_aln) @@ -109,13 +117,13 @@ def decompress(file_path: str): def align( + bin: str, corpus_src: str, corpus_trg: str, - priors_input_path: Optional[str], tmp_dir: str, + aligner_type: AlignerType, + priors_input_path: Optional[str], ): - import eflomal - logger.info("Splitting corpus into parts") # align in chunks to prevent OOM chunk_lines = 100000000 @@ -125,6 +133,57 @@ def align( fwd_path = os.path.join(tmp_dir, "aln.fwd") rev_path = os.path.join(tmp_dir, "aln.rev") + for src_part in sorted(glob(f"{corpus_src}.*")): + suffix = src_part.split(".")[-1] + logger.info(f"Processing part {suffix}") + + corpus_src_part = f"{corpus_src}.{suffix}" + corpus_trg_part = f"{corpus_trg}.{suffix}" + fwd_path_part = f"{fwd_path}.{suffix}" + rev_path_part = f"{rev_path}.{suffix}" + + if aligner_type == AlignerType.eflomal: + logger.info("Calculating alignments with eflomal...") + align_eflomal( + corpus_src_part, corpus_trg_part, fwd_path_part, rev_path_part, priors_input_path + ) + elif aligner_type == AlignerType.fast_align: + if priors_input_path is not None: + logger.warning("eflomal priors are ignore when aligning with fast_align") + logger.info("Calculating alignments with fast_align...") + align_fast_align( + bin, + corpus_src_part, + corpus_trg_part, + fwd_path_part, + rev_path_part, + os.path.join(tmp_dir, f"corpus.{suffix}"), + ) + else: + raise ValueError(f"Unsupported aligner type: {aligner_type}") + + # Merge alignments parts into one file + with open(fwd_path, "w") as fwd_out: + fwd_parts = sorted(glob(f"{fwd_path}.*")) + logger.info(f"Merging alignments: {fwd_parts}") + subprocess.check_call(["cat"] + fwd_parts, stdout=fwd_out) + with open(rev_path, "w") as rev_out: + rev_parts = sorted(glob(f"{rev_path}.*")) + logger.info(f"Merging alignments: {rev_parts}") + subprocess.check_call(["cat"] + rev_parts, stdout=rev_out) + + return fwd_path, rev_path + + +def align_eflomal( + corpus_src: str, + corpus_trg: str, + fwd_path: str, + rev_path: str, + priors_input_path: Optional[str], +): + import eflomal + with ExitStack() as stack: if priors_input_path: logger.info(f"Using provided priors: {priors_input_path}") @@ -132,40 +191,45 @@ def align( else: priors_input = None - for src_part in sorted(glob(f"{corpus_src}.*")): - suffix = src_part.split(".")[-1] - logger.info(f"Processing part {suffix}") - # We use eflomal aligner. # It is less memory intensive than fast_align. # fast_align failed with OOM in a large white-space tokenized corpus aligner = eflomal.Aligner() + src_input = stack.enter_context(open(corpus_src, "r", encoding="utf-8")) + trg_input = stack.enter_context(open(corpus_trg, "r", encoding="utf-8")) - src_input = stack.enter_context(open(f"{corpus_src}.{suffix}", "r", encoding="utf-8")) - trg_input = stack.enter_context(open(f"{corpus_trg}.{suffix}", "r", encoding="utf-8")) - - logger.info("Calculating alignments...") aligner.align( src_input, trg_input, - links_filename_fwd=f"{fwd_path}.{suffix}", - links_filename_rev=f"{rev_path}.{suffix}", + links_filename_fwd=fwd_path, + links_filename_rev=rev_path, priors_input=priors_input, quiet=False, use_gdb=False, ) - # Merge alignments parts into one file - with open(fwd_path, "w") as fwd_out: - fwd_parts = sorted(glob(f"{fwd_path}.*")) - logger.info(f"Merging alignments: {fwd_parts}") - subprocess.check_call(["cat"] + fwd_parts, stdout=fwd_out) - with open(rev_path, "w") as rev_out: - rev_parts = sorted(glob(f"{rev_path}.*")) - logger.info(f"Merging alignments: {rev_parts}") - subprocess.check_call(["cat"] + rev_parts, stdout=rev_out) - return fwd_path, rev_path +def align_fast_align( + bin: str, corpus_src: str, corpus_trg: str, fwd_path: str, rev_path: str, tmp_file_path: str +) -> None: + logger.info("Creating corpus in fast_align format") + with open(corpus_src) as src_f, open(corpus_trg) as trg_f, open( + tmp_file_path, "w" + ) as corpus_f: + for line_src, line_trg in zip(src_f, trg_f): + corpus_f.write(f"{line_src[:-1]} ||| {line_trg[:-1]}\n") + + logger.info("Calculating forward alignments with fast_align") + with open(fwd_path, "w") as fwd_f: + subprocess.check_call( + [os.path.join(bin, "fast_align"), "-vod", "-i", tmp_file_path], stdout=fwd_f + ) + + logger.info("Calculating reversed alignments with fast_align") + with open(rev_path, "w") as rev_f: + subprocess.check_call( + [os.path.join(bin, "fast_align"), "-vodr", "-i", tmp_file_path], stdout=rev_f + ) def symmetrize(bin: str, fwd_path: str, rev_path: str, output_path: str): @@ -371,6 +435,14 @@ def main() -> None: help="Use the specified tokenization method. Default is `spaces` which means no tokenization will be applied. " "It remaps the alignments back to whitespace tokenized ones if the `moses` tokenization is used.", ) + parser.add_argument( + "--aligner", + metavar="ALIGNER", + type=AlignerType, + choices=list(AlignerType), + default=AlignerType.eflomal, + help="Use the specified aligner. Default is `eflomal`. Input priors are ignored when using fast_align ", + ) args = parser.parse_args() logger.info("Starting generating alignments.") run( @@ -380,6 +452,7 @@ def main() -> None: args.priors_input_path, args.priors_output_path, args.tokenization, + aligner_type=args.aligner, ) logger.info("Finished generating alignments.") diff --git a/pipeline/alignments/generate-shortlist.sh b/pipeline/alignments/generate-shortlist.sh index c55de6520..830aae666 100755 --- a/pipeline/alignments/generate-shortlist.sh +++ b/pipeline/alignments/generate-shortlist.sh @@ -49,7 +49,8 @@ ${COMPRESSION_CMD} -dc "${corpus_trg}" | python3 align.py \ --corpus_src="${dir}/corpus.spm.${SRC}" \ --corpus_trg="${dir}/corpus.spm.${TRG}" \ - --output_path="${output_dir}/corpus.aln" + --output_path="${output_dir}/corpus.aln" \ + --aligner=fast_align echo "### Creating shortlist" "${BIN}/extract_lex" \ diff --git a/taskcluster/kinds/alignments-student/kind.yml b/taskcluster/kinds/alignments-student/kind.yml index ecfdd74a8..fc4edb8fd 100644 --- a/taskcluster/kinds/alignments-student/kind.yml +++ b/taskcluster/kinds/alignments-student/kind.yml @@ -15,7 +15,6 @@ transforms: kind-dependencies: - cefilter - toolchain - - alignments-original tasks: "{src_locale}-{trg_locale}": @@ -78,11 +77,10 @@ tasks: --corpus_trg=$MOZ_FETCHES_DIR/corpus.{trg_locale}.zst --output_path=$TASK_WORKDIR/artifacts/corpus.aln.zst --tokenization=moses - --priors_input_path=$MOZ_FETCHES_DIR/corpus.priors + --aligner=fast_align dependencies: cefilter: cefilter-{src_locale}-{trg_locale} - alignments-original: alignments-original-{src_locale}-{trg_locale} fetches: cefilter: @@ -92,5 +90,3 @@ tasks: - marian - fast-align - extract-lex - alignments-original: - - artifact: corpus.priors