"""Tools to facilitate input and output."""
from typing import Any, Callable, List, Optional, TextIO, Tuple, Union
from collections import defaultdict
from functools import partial
from string import printable
from tempfile import NamedTemporaryFile
from xml.etree import ElementTree
from carabiner import print_err
from carabiner.cast import cast
from carabiner.itertools import tenumerate
from carabiner.pd import read_table, write_stream
from pandas import DataFrame, read_excel
from rdkit.Chem import SDMolSupplier
from .converting import _mol2isomeric_canonical_smiles
def _mutate_df_stream(input_file: Union[str, TextIO],
output_file: Union[str, TextIO],
function: Callable[[DataFrame], Tuple[Any, DataFrame]],
file_format: Optional[str] = None,
chunksize: int = 1000) -> List[Any]:
carries = []
for i, chunk in tenumerate(
read_table(
input_file,
format=file_format,
progress=False,
chunksize=chunksize,
)
):
result = function(chunk)
try:
carry, df = result
except ValueError:
df = result
carry = 0
write_stream(
df,
output=output_file,
format=file_format,
header=i == 0,
mode='w' if i == 0 else 'a',
)
carries.append(carry)
return carries
[docs]
def read_weird_xml(filename: Union[str, TextIO],
header: bool = True,
namespace: str = '{urn:schemas-microsoft-com:office:spreadsheet}') -> DataFrame:
"""
"""
with cast(filename, TextIO, mode='r') as f:
xml_string = ''.join(filter(printable.__contains__, f.read()))
try:
root = ElementTree.fromstring(xml_string)
except Exception as e:
print_err('\n!!! ' + xml_string.split('\n')[1184][377:380])
raise e
for i, row in enumerate(root.iter(f'{namespace}Row') ):
this_row = [datum.text for datum in row.iter(f'{namespace}Data')]
if i == 0:
if header:
heading = this_row
df = {colname: [] for colname in heading}
else:
heading = [f'X{j}' for j, _ in enumerate(this_row)]
df = {colname: [datum] for colname, datum in zip(heading, this_row)}
else:
for colname, datum in zip(heading, this_row):
df[colname].append(datum)
return DataFrame(df)
[docs]
def read_sdf(filename: Union[str, TextIO]):
"""
"""
filename = cast(filename, str)
with open(filename, 'r', errors='replace') as f:
with NamedTemporaryFile("w") as o:
o.write(f.read())
o.seek(0)
df = defaultdict(list)
for i, mol in enumerate(SDMolSupplier(o.name)):
if mol is None:
continue
propdict = mol.GetPropsAsDict()
propdict['SMILES'] = _mol2isomeric_canonical_smiles(mol)
for colname in propdict:
df[colname].append(propdict[colname])
for colname in df:
if colname not in propdict:
df[colname].append(None)
col_lengths = {col: len(val) for col, val in df.items()}
if len(set(col_lengths.values())) > 1:
raise ValueError(f"Column lengths not all the same:\n\t" +
'\n\t'.join(f"{key}:{val}" for key, val in col_lengths.items()))
return DataFrame(df)
FILE_READERS = {
'bad_xml': read_weird_xml,
'xlsx': partial(read_excel, engine='openpyxl'),
'sdf': read_sdf
}