Using AWS Textract to parse ridiculous FAA documents into CSV

By Ryan Romanchuk
On

I wanted to model Type Code Designators and it was surprisingly difficult to find a complete, developer friendly, dataset. I present to you the 814 page PDF ORDER JO 7360.1E

Cover Page ORDER JO 7360.1E

Using StartDocumentTextDetection[1] I fed Appendix A. Aircraft Type Designator through a TABLES analysis, then gathered the response results and fed them through a modified text_extractor.py to dump to CSV.

python3 text_extractor.py analyzeDocResponse.json

Finally, I brute forced my way through the machine-learn'd results, jamming a never ending supply of acronyms and abbreviations into a relational database. VOILÀ https://romanch.uk/aircraft_type_designators?q=piper.

namespace :icaos do
  desc 'Load ICAO type codes'
  task ingest: :environment do
    require 'csv'
    ROOT = File.expand_path('../../', __dir__)
    CSV_FILE = File.join(ROOT, 'ICAO_AircraftType.csv')

    CSV.foreach(CSV_FILE) do |row|
      ap row
      next unless row[0]

      params = {}
      params[:icao_code] = row[0]
      params[:aircraft_class] = row[1]
      engines, weight_class = row[2]&.split('/')
      number_of_engines, engine_type = engines&.split('')
      params[:engine_type] = engine_type
      params[:number_of_engines] = number_of_engines
      params[:weight_class] = weight_class
      params[:icao_wtc] = row[3]
      params[:recat] = row[4]
      params[:recat_wtc_a] = row[5]
      params[:recat_wtc_b] = row[6]
      params[:cwtc] = row[7]
      params[:srs] = row[8]
      params[:lahso] = row[9]

      params[:manufacturer] = row[10]
      params[:model] = row[11]
      params[:data] = { other_models: row.drop(12).compact_blank }

      ap params
      ap AircraftTypeDesignator.upsert(params)
    end
  end
end
# text_extractor.py
import json
import sys
from pprint import pprint


def get_rows_columns_map(table_result, blocks_map):
    rows = {}
    for relationship in table_result['Relationships']:
        if relationship['Type'] == 'CHILD':
            for child_id in relationship['Ids']:
                cell = blocks_map[child_id]
                if cell['BlockType'] == 'CELL':
                    row_index = cell['RowIndex']
                    col_index = cell['ColumnIndex']
                    if row_index not in rows:
                        # create new row
                        rows[row_index] = {}
                        
                    # get the text value
                    rows[row_index][col_index] = get_text(cell, blocks_map)
    return rows


def get_text(result, blocks_map):
    text = ''
    if 'Relationships' in result:
        for relationship in result['Relationships']:
            if relationship['Type'] == 'CHILD':
                for child_id in relationship['Ids']:
                    word = blocks_map[child_id]
                    if word['BlockType'] == 'WORD':
                        text += word['Text'] + ' '
                    if word['BlockType'] == 'SELECTION_ELEMENT':
                        if word['SelectionStatus'] =='SELECTED':
                            text +=  'X '    
    return text


def get_table_csv_results(file_name):

    with open(file_name, 'rb') as file:
        #img_test = file.read()
        #bytes_test = bytearray(img_test)
        response = json.load(file)
        #print('Image loaded', file_name)

    # process using image bytes
    # get the results
    #client = boto3.client('textract')

    #response = client.analyze_document(Document={'Bytes': bytes_test}, FeatureTypes=['TABLES'])

    # Get the text blocks
    blocks=response['Blocks']
    pprint(blocks)

    blocks_map = {}
    table_blocks = []
    for block in blocks:
        blocks_map[block['Id']] = block
        if block['BlockType'] == "TABLE":
            table_blocks.append(block)

    if len(table_blocks) <= 0:
        return "<b> NO Table FOUND </b>"

    csv = ''
    for index, table in enumerate(table_blocks):
        csv += generate_table_csv(table, blocks_map, index +1)
        csv += '\n\n'

    return csv

def generate_table_csv(table_result, blocks_map, table_index):
    rows = get_rows_columns_map(table_result, blocks_map)

    table_id = 'Table_' + str(table_index)
    
    # get cells.
    csv = 'Table: {0}\n\n'.format(table_id)

    for row_index, cols in rows.items():
        
        for col_index, text in cols.items():
            csv += '{}'.format(text) + ","
        csv += '\n'
        
    csv += '\n\n\n'
    return csv

def main(file_name):
    table_csv = get_table_csv_results(file_name)

    output_file = file_name + '_output.csv'

    # replace content
    with open(output_file, "wt") as fout:
        fout.write(table_csv)

    # show the results
    print('CSV OUTPUT FILE: ', output_file)


if __name__ == "__main__":
    file_name = sys.argv[1]
    main(file_name)

  1. https://docs.aws.amazon.com/textract/latest/dg/API_StartDocumentTextDetection.html ↩︎

talk