# coding=utf-8
import os
import re
from zipfile import ZipFile
import dateutil.parser as parser
import pytz
from struct import unpack
import numpy as np
import pandas as pd
[docs]class ForceArchive:
"""
Object to handle reading contents of a jpk-force zipped file.
- **Methods**
- ls: list archive contents
- read_properties: read utf-8 string decoded content of a property file, one property per list entry
- read_data: read encoded raw data, must be converted to appropriate physical quantity!
"""
# noinspection SpellCheckingInspection
def __init__(self, file_path):
self._zip_file = ZipFile(file_path)
if not self.read_properties('header.properties')['jpk-data-file'] == 'spm-forcefile':
raise ValueError("not a valid spm-forcefile!")
self.contents = self.ls()
[docs] def ls(self):
"""List all files contained in this force-archive"""
return self._zip_file.infolist()
[docs] def read_properties(self, content_path):
"""
Reads a property file form the force-archive.
The contents of the property file are elements of a list. Each entry is already decoded to utf-8.
:param content_path: internal path to the force-archive file
:type content_path: str
:return: property list
:rtype: dict
"""
if not os.path.basename(content_path).endswith(".properties"):
raise ValueError("this content path is not a property file")
try:
with self._zip_file.open(content_path) as file:
content = [line.decode('utf-8') for line in file.read().splitlines()]
# parse prop dictionary (without header date)
props = {}
for line in content[1:]:
key, value = line.split("=")
props[key] = value
# parse measurement date-time
fmt = '%Y-%m-%d %H:%M:%S %Z%z'
utc = pytz.utc
props["timestamp"] = utc.localize(parser.parse(content[0][1:], dayfirst=True)).strftime(fmt)
return props
except IOError:
print("can't read property file")
[docs] def read_data(self, content_path):
"""
Reads the raw integer-encoded data of the specified data file inside a force-archive.
:param content_path: internal path to the force-archive file
:type content_path: str
:return: raw data
:rtype: numpy.ndarray
"""
if not os.path.basename(content_path).endswith(".dat"):
raise ValueError("this content path is not a data file")
try:
# read binary data
data = self._zip_file.read(content_path)
# decode using big-endian integer
result = []
for i in range(int(len(data) / 4)):
result.append(unpack('!i', data[i * 4:(i + 1) * 4]))
# returning integer-encoded raw data vector
return np.array(result)
except IOError:
print("can't read data file")
[docs]class Properties:
"""
Object to automatically extract and conveniently use relevant JPK force file header information.
This comprises things like conversion factors for raw data, units, and so on
- **attributes**
- vDeflection_channel_number: internal number of vDeflection channel raw data
- conversion_factors: dictionary containing important information
- units: dictionary containing channel units
- **example usage**::
>>> force_file = r"path/to/jpk-force-file"
>>> props = Properties(file_path=force_file)
>>> print(props.units["vDeflection"])
V
>>> print(props.conversion_factors["vDeflection"]["force multiplier"])
0.01529211140472191
"""
def __init__(self, file_path):
# parse file path to header.properties file
self.file_path = file_path
# load the property file (you have to instantiate and load subsequently)
self.general = self.load_general_props()
self.segments = self.extract_segment_props()
# set vDeflection channel number, always extract freshly because channel numbering seems to be inconsistent
self.channel_numbers = self.get_channel_numbers()
# extract raw conversion factors and other specifications like units and the lik
self.conversion_factors = self.extract_conversion_factors()
self.units = {}
self.extract_specs()
[docs] def load_general_props(self):
"""
This actually loads the props file on disk from jpk-force zip-file. Parses all java-properties info and the
timestamp from the header of the header.
:return: props dictionary
:rtype: dict
"""
# load general and shared header.properties file from zipfile
root = ForceArchive(self.file_path).read_properties('header.properties')
shared = ForceArchive(self.file_path).read_properties('shared-data/header.properties')
full = {}
full.update(root)
full.update(shared)
return full
# noinspection PyPep8Naming
[docs] def get_channel_numbers(self):
"""
Extracts the channel numbers for each channel.
:return: dictionary with channel numbers
:rtype: dict
"""
channel_numbers = {"vDeflection": None, "hDeflection": None, "height": None, "capacitiveSensorHeight": None}
for key, value in self.general.items():
if value == "vDeflection":
channel_numbers[value] = re.search(r'(?<=lcd-info\.)\d(?=\.channel.name)', key).group()
if value == "hDeflection":
channel_numbers[value] = re.search(r'(?<=lcd-info\.)\d(?=\.channel.name)', key).group()
if value == "height":
channel_numbers[value] = re.search(r'(?<=lcd-info\.)\d(?=\.channel.name)', key).group()
if value == "capacitiveSensorHeight":
channel_numbers[value] = re.search(r'(?<=lcd-info\.)\d(?=\.channel.name)', key).group()
return channel_numbers
# noinspection PyPep8Naming
# noinspection SpellCheckingInspection,PyPep8Naming
@staticmethod
[docs] def convert_segment_name(jpk_name):
"""Convert JPKs segment names to useful ones"""
if jpk_name == 'extend':
real_name = 'approach'
elif jpk_name == 'pause-at-end':
real_name = 'contact'
elif jpk_name == 'pause-at-start':
real_name = 'pause'
else:
real_name = jpk_name
return real_name
[docs]class CellHesion:
# noinspection SpellCheckingInspection
"""
This is the main data-class that provides all functionality to load, analyze and display a single JPK
CellHesion200 force file archive.
**Attributes**
The following attributes are available:
- archive: an instance of :class:`.ForceArchive`
- properties: an instance of :class:`.Properties`
- data: :class:`pandas:pandas.DataFrame`
**Example Usage**
>>> jpk_file = r'path/to/jpk-force/file'
>>> sample = CellHesion(force_file=jpk_file)
>>> import matplotlib.pyplot as plt
>>> x = sample.data.retract.height * 10**6
>>> y = sample.data.retract.force * 10**12
>>> plt.plot(x, y)
>>> plt.xlabel("height [µm]"); plt.ylabel("force [pN]")
"""
def __init__(self, force_file):
# parse and check file path
if os.path.isfile(force_file):
self.file = force_file
else:
raise ValueError("file does not exist")
#
self.archive = ForceArchive(file_path=self.file)
self.properties = Properties(file_path=self.file)
#
self.data = self.load_data()
# noinspection PyPep8Naming
[docs] def load_encoded_data_segment(self, segment):
"""
Loads the raw, encoded vertical deflection and height data of the specified segment.
This has to be converted using :func:`convert_data` to make use of it.
:param segment: data segment to load
:type segment: str
:return: vDeflection and height
"""
# get data locations
segment_number = self.properties.segments[segment]['segment_number']
vDeflection_file = 'segments/{}/channels/vDeflection.dat'.format(segment_number)
height_file = 'segments/{}/channels/height.dat'.format(segment_number)
# load encoded data from archive
vDeflection = self.archive.read_data(vDeflection_file)
height = self.archive.read_data(height_file)
return vDeflection, height
# noinspection PyPep8Naming
[docs] def load_data(self):
"""
Load converted data to DataFrame. See :func:`construct_df` for DataFrame structure.
:return: force/height data
:rtype: pandas.DataFrame
"""
df = self.construct_df()
for segment in list(self.properties.segments.keys()):
# load raw data
vDeflection_raw, height_raw = self.load_encoded_data_segment(segment)
# convert data to normal physical units
vDeflection = self.convert_data('vDeflection', vDeflection_raw)
height = self.convert_data('height', height_raw)
df.loc[:, (segment, 'force')] = pd.Series(vDeflection.squeeze())
df.loc[:, (segment, 'height')] = pd.Series(height.squeeze())
return df
[docs] def convert_data(self, channel, data):
"""
Convert specific data from specific channel from encoded integer format to physical quantity.
Each channel has it's own conversion factors and formulas, so the correct channel has to be provided.
:param channel: data channel
:type channel: str
:param data: encoded data
:type data: numpy.ndarray
:return: converted data
:rtype: numpy.array
"""
if not isinstance(data, np.ndarray):
raise ValueError("data has to be numpy array")
# convert vDeflection from encoded to distance to force with linear conversion factors
# the returned object is already a numpy ndarray in unit Newton (N)
if channel == 'vDeflection':
raw_m = self.properties.conversion_factors[channel]["raw multiplier"]
raw_n = self.properties.conversion_factors[channel]["raw offset"]
dist_m = self.properties.conversion_factors[channel]["distance multiplier"]
dist_n = self.properties.conversion_factors[channel]["distance offset"]
force_m = self.properties.conversion_factors[channel]["force multiplier"]
force_n = self.properties.conversion_factors[channel]["force offset"]
converted_data = ((raw_m * data + raw_n) * dist_m + dist_n) * force_m + force_n
return converted_data
# convert height from encoded to calibrated height
# the returned object is already a numpy ndarray in unit Meter (m)
elif channel == 'height':
raw_m = self.properties.conversion_factors[channel]["raw multiplier"]
raw_n = self.properties.conversion_factors[channel]["raw offset"]
cal_m = self.properties.conversion_factors[channel]["calibrated multiplier"]
cal_n = self.properties.conversion_factors[channel]["calibrated offset"]
converted_data = (raw_m * data + raw_n) * cal_m + cal_n
return converted_data
else:
raise ValueError("not a valid channel")
@staticmethod
[docs] def construct_df():
"""
Construct a pandas DataFrame to store force and height data for each segment.
:return: DataFrame blueprint
:rtype: pandas.DataFrame
"""
iterable = [['approach', 'contact', 'retract', 'pause'], ['force', 'height']]
index = pd.MultiIndex.from_product(iterable, names=['segment', 'channel'])
return pd.DataFrame(columns=index)