Source code for schemist.io

"""Tools to facilitate input and output."""

from typing import Any, Callable, List, Optional, TextIO, Tuple, Union

from collections import defaultdict
from functools import partial
from string import printable
from tempfile import NamedTemporaryFile
from xml.etree import ElementTree

from carabiner import print_err
from carabiner.cast import cast
from carabiner.itertools import tenumerate
from carabiner.pd import read_table, write_stream

from pandas import DataFrame, read_excel
from rdkit.Chem import SDMolSupplier

from .converting import _mol2isomeric_canonical_smiles

def _mutate_df_stream(input_file: Union[str, TextIO], 
                      output_file: Union[str, TextIO], 
                      function: Callable[[DataFrame], Tuple[Any, DataFrame]],
                      file_format: Optional[str] = None, 
                      chunksize: int = 1000) -> List[Any]:
    
    carries = []

    for i, chunk in tenumerate(
        read_table(
            input_file, 
            format=file_format,
            progress=False,
            chunksize=chunksize,
        )
    ):
        
        result = function(chunk)

        try:
            carry, df = result
        except ValueError:
            df = result
            carry = 0
        
        write_stream(
            df, 
            output=output_file,
            format=file_format,
            header=i == 0,
            mode='w' if i == 0 else 'a',
        )
        
        carries.append(carry)

    return carries


[docs] def read_weird_xml(filename: Union[str, TextIO], header: bool = True, namespace: str = '{urn:schemas-microsoft-com:office:spreadsheet}') -> DataFrame: """ """ with cast(filename, TextIO, mode='r') as f: xml_string = ''.join(filter(printable.__contains__, f.read())) try: root = ElementTree.fromstring(xml_string) except Exception as e: print_err('\n!!! ' + xml_string.split('\n')[1184][377:380]) raise e for i, row in enumerate(root.iter(f'{namespace}Row') ): this_row = [datum.text for datum in row.iter(f'{namespace}Data')] if i == 0: if header: heading = this_row df = {colname: [] for colname in heading} else: heading = [f'X{j}' for j, _ in enumerate(this_row)] df = {colname: [datum] for colname, datum in zip(heading, this_row)} else: for colname, datum in zip(heading, this_row): df[colname].append(datum) return DataFrame(df)
[docs] def read_sdf(filename: Union[str, TextIO]): """ """ filename = cast(filename, str) with open(filename, 'r', errors='replace') as f: with NamedTemporaryFile("w") as o: o.write(f.read()) o.seek(0) df = defaultdict(list) for i, mol in enumerate(SDMolSupplier(o.name)): if mol is None: continue propdict = mol.GetPropsAsDict() propdict['SMILES'] = _mol2isomeric_canonical_smiles(mol) for colname in propdict: df[colname].append(propdict[colname]) for colname in df: if colname not in propdict: df[colname].append(None) col_lengths = {col: len(val) for col, val in df.items()} if len(set(col_lengths.values())) > 1: raise ValueError(f"Column lengths not all the same:\n\t" + '\n\t'.join(f"{key}:{val}" for key, val in col_lengths.items())) return DataFrame(df)
FILE_READERS = { 'bad_xml': read_weird_xml, 'xlsx': partial(read_excel, engine='openpyxl'), 'sdf': read_sdf }